You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/19 06:07:30 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #16664: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

eolivelli commented on code in PR #16664:
URL: https://github.com/apache/pulsar/pull/16664#discussion_r924087799


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -893,7 +893,18 @@ private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, M
             DeleteLedgerCallback callback, Object ctx) {
         Futures.waitForAll(info.ledgers.stream()
                 .filter(li -> !li.isOffloaded)
-                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
+                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+                        .handleAsync((result, ex) -> {
+                            if (ex != null) {
+                                int rc = BKException.getExceptionCode(ex);
+                                if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                                    || rc == BKException.Code.NoSuchLedgerExistsException) {
+                                    return null;

Review Comment:
   I think that we should log something, at INFO level, this way we will see when/if this happens in production.
   we are not going to flood the logs because I think that this will happen only seldom.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {

Review Comment:
   why aren't we failing in case of failure of createTenant/createNamespace ?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {
+
+        }
+        final String ns1 = tenant + "/crash-broker";
+        try {
+            admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName()));
+        } catch (Exception e) {
+
+        }
+
+        final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
+
+        // Create subscription
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
+                .receiverQueueSize(5).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
+        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
+        configField.setAccessible(true);
+        // Create multiple data-ledger
+        ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        // bookkeeper client
+        Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+        bookKeeperField.setAccessible(true);
+        // Create multiple data-ledger
+        BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+        // (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger
+        Producer<byte[]> producer = client.newProducer().topic(topic1).create();
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // validate: consumer is able to consume msg and close consumer after reading 1 entry
+        Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
+        consumer.close();
+
+        NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+        Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
+        Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
+        long firstLedgerToDelete = lastLedger.getKey();
+
+        // (2) delete first 4 data-ledgers
+        ledgerInfo.entrySet().forEach(entry -> {
+            if (!entry.equals(lastLedger)) {
+                assertEquals(entry.getValue().getEntries(), entriesPerLedger);
+                try {
+                    bookKeeper.deleteLedger(entry.getKey());
+                } catch (Exception e) {
+                    e.printStackTrace();

Review Comment:
   nit: use logger



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java:
##########
@@ -61,15 +85,18 @@ public InvalidImplementationException(String msg) {
      */
     public static class NotFoundException extends MetadataStoreException {
         public NotFoundException() {
-            super((Throwable) null);
+            super(makeBkFriendlyException(

Review Comment:
   my understanding is that MetadataStoreException is about ZooKeeper/Etdc/RocksDB metadata stores.
   so NotFound is like "znode does not exist"
   
   why do we need to always inject a BKException as cause ?
   
   can we do it only when we are using PulsarLedgerManager/ManagedLedger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org