You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/19 02:10:39 UTC

[GitHub] [pulsar] dlg99 opened a new pull request, #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

dlg99 opened a new pull request, #16664:
URL: https://github.com/apache/pulsar/pull/16664

   ### Motivation
   
   In some situations it is possible to encounter case when deletion of a ManagedLedger deals with cases of already deleted bookie ledgers. 
   Such cases currently handled as errors even though they are safe to ignore.
   Currently, it is impossible to handle these cases because PulsarManagedLedger returns error that's not mappable into the BK error code end the end user ends up with obscure `UnexpectedConditionException` (error code -999) that cannot be distinguished from ledger already deleted case.
   
   ### Modifications
   
   1. Made PulsarManagedLedger compatible with BK erros so BK client has a chance to return correct error. I didn't do this for all MetadataStoreException, just for the ones where it made sense.
   2. Ignored NoSuchLedgerExistsOnMetadataServerException on delete as it is safe there 
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added unit tests
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
   Nothing that I can think of.
   BK Error codes can change (on purpose) for the internal components to become more specific but MetadataStoreException's type didn't change.
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r925010868


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {

Review Comment:
   AFAICT these could be created by other test cases; following the same pattern as in other test cases here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
dlg99 commented on PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#issuecomment-1198684969

   https://github.com/apache/pulsar/pull/16857 takes over


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r925031346


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -893,7 +893,18 @@ private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, M
             DeleteLedgerCallback callback, Object ctx) {
         Futures.waitForAll(info.ledgers.stream()
                 .filter(li -> !li.isOffloaded)
-                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
+                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+                        .handleAsync((result, ex) -> {
+                            if (ex != null) {
+                                int rc = BKException.getExceptionCode(ex);
+                                if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                                    || rc == BKException.Code.NoSuchLedgerExistsException) {
+                                    return null;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r924052437


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -893,7 +893,18 @@ private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, M
             DeleteLedgerCallback callback, Object ctx) {
         Futures.waitForAll(info.ledgers.stream()
                 .filter(li -> !li.isOffloaded)
-                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
+                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+                        .handleAsync((result, ex) -> {

Review Comment:
   isn't it sufficient to use `.handle(` instead of `.handleAsync(` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r925828232


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -921,7 +933,20 @@ private CompletableFuture<Void> deleteCursor(BookKeeper bkc, String managedLedge
 
         // Delete the cursor ledger if present
         if (cursor.cursorsLedgerId != -1) {
-            cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId).execute();
+            cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId)
+                    .execute()
+                    .handle((result, ex) -> {
+                        if (ex != null) {

Review Comment:
   This does not work, in this case PulsarLedgerManager returns NotFoundException to Bookkeeper's code which remaps it into UnexpectedConditionException for the pulsar callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r925763647


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -921,7 +933,20 @@ private CompletableFuture<Void> deleteCursor(BookKeeper bkc, String managedLedge
 
         // Delete the cursor ledger if present
         if (cursor.cursorsLedgerId != -1) {
-            cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId).execute();
+            cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId)
+                    .execute()
+                    .handle((result, ex) -> {
+                        if (ex != null) {

Review Comment:
   what about looking for NotFoundException in the exception chain ?
   we should have some utility to traverse the chain and look for a specific class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 closed pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
dlg99 closed pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client
URL: https://github.com/apache/pulsar/pull/16664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r924087799


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -893,7 +893,18 @@ private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, M
             DeleteLedgerCallback callback, Object ctx) {
         Futures.waitForAll(info.ledgers.stream()
                 .filter(li -> !li.isOffloaded)
-                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
+                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+                        .handleAsync((result, ex) -> {
+                            if (ex != null) {
+                                int rc = BKException.getExceptionCode(ex);
+                                if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                                    || rc == BKException.Code.NoSuchLedgerExistsException) {
+                                    return null;

Review Comment:
   I think that we should log something, at INFO level, this way we will see when/if this happens in production.
   we are not going to flood the logs because I think that this will happen only seldom.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {

Review Comment:
   why aren't we failing in case of failure of createTenant/createNamespace ?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {
+
+        }
+        final String ns1 = tenant + "/crash-broker";
+        try {
+            admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName()));
+        } catch (Exception e) {
+
+        }
+
+        final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
+
+        // Create subscription
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
+                .receiverQueueSize(5).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
+        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
+        configField.setAccessible(true);
+        // Create multiple data-ledger
+        ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        // bookkeeper client
+        Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+        bookKeeperField.setAccessible(true);
+        // Create multiple data-ledger
+        BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+        // (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger
+        Producer<byte[]> producer = client.newProducer().topic(topic1).create();
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // validate: consumer is able to consume msg and close consumer after reading 1 entry
+        Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
+        consumer.close();
+
+        NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+        Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
+        Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
+        long firstLedgerToDelete = lastLedger.getKey();
+
+        // (2) delete first 4 data-ledgers
+        ledgerInfo.entrySet().forEach(entry -> {
+            if (!entry.equals(lastLedger)) {
+                assertEquals(entry.getValue().getEntries(), entriesPerLedger);
+                try {
+                    bookKeeper.deleteLedger(entry.getKey());
+                } catch (Exception e) {
+                    e.printStackTrace();

Review Comment:
   nit: use logger



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java:
##########
@@ -61,15 +85,18 @@ public InvalidImplementationException(String msg) {
      */
     public static class NotFoundException extends MetadataStoreException {
         public NotFoundException() {
-            super((Throwable) null);
+            super(makeBkFriendlyException(

Review Comment:
   my understanding is that MetadataStoreException is about ZooKeeper/Etdc/RocksDB metadata stores.
   so NotFound is like "znode does not exist"
   
   why do we need to always inject a BKException as cause ?
   
   can we do it only when we are using PulsarLedgerManager/ManagedLedger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r925031166


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java:
##########
@@ -61,15 +85,18 @@ public InvalidImplementationException(String msg) {
      */
     public static class NotFoundException extends MetadataStoreException {
         public NotFoundException() {
-            super((Throwable) null);
+            super(makeBkFriendlyException(

Review Comment:
   I changed this to be more lightweight (no new exception created, static final exception set as a cause, traversing exception chain is fast).
   
   There are reasons why decided to take this route:
   With all Java's love for checked exceptions, CompletableFuture in the API can be completed with any exception, hence BK's API implemented in Pulsar returns exceptions that BK cannot handle properly. So there is no way for compiler to strictly enforce API contract that suits BK.
   
   As result, removeLedgerMetadata() just returns whatever exception store.delete() produces etc.
   While I can remap the exception there into BK-specific, it can break some Pulsar code (like the callbacks that rely on ex.getCause().getCause() being MetadataStoreException). I'd very much prefer not to go through all code base tracking all possible gotchas as I cannot guarantee that tests will all the cases.
   Alternatively I'd have to inject cause the same way I do now but with more steps.
   Plus there is MetadataStoreException.unwrap which recreates exception with the message / without the original exception.
   
   Current approach communicates appropriate error to BK so we are no longer getting UnexpectedConditionException in obvious cases and can properly handle basic errors, does not add overhead (again, traversing exception chain is fast), fool-proof enough so we don't have to worry about breaking it all in case LedgerManager is extended or changed, and it does not add mental overhead for Pulsar developers (no need to think about BK errors.
   
   let me know if I am missing something obvious / other way to make this work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org