You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/01 03:44:41 UTC

[GitHub] [pulsar] RobertIndie commented on a diff in pull request #16857: [fix][broker] PulsarLedgerManager to pass correct error code to BK client

RobertIndie commented on code in PR #16857:
URL: https://github.com/apache/pulsar/pull/16857#discussion_r934115168


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java:
##########
@@ -279,6 +286,147 @@ public void testSkipCorruptDataLedger() throws Exception {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {
+
+        }
+        final String ns1 = tenant + "/crash-broker";
+        try {
+            admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName()));
+        } catch (Exception e) {
+
+        }
+
+        final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
+
+        // Create subscription
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
+                .receiverQueueSize(5).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
+        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
+        configField.setAccessible(true);
+        // Create multiple data-ledger
+        ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        // bookkeeper client
+        Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+        bookKeeperField.setAccessible(true);
+        // Create multiple data-ledger
+        BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+        // (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger
+        Producer<byte[]> producer = client.newProducer().topic(topic1).create();
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // validate: consumer is able to consume msg and close consumer after reading 1 entry
+        Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
+        consumer.close();
+
+        NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+        Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
+        Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
+        long firstLedgerToDelete = lastLedger.getKey();
+
+        // (2) delete first 4 data-ledgers
+        ledgerInfo.entrySet().forEach(entry -> {
+            if (!entry.equals(lastLedger)) {
+                assertEquals(entry.getValue().getEntries(), entriesPerLedger);
+                try {
+                    bookKeeper.deleteLedger(entry.getKey());
+                } catch (Exception e) {
+                    log.warn("failed to delete ledger {}", entry.getKey(), e);
+                }
+            }
+        });
+
+        // create 5 more ledgers
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message2-" + i;
+            producer.send(message.getBytes());
+        }
+
+        ml.delete();
+
+        // Admin should be able to truncate the topic
+        admin.topics().truncate(topic1);
+
+        ledgerInfo.entrySet().forEach(entry -> {
+            log.warn("found ledger: {}", entry.getKey());
+            assertNotEquals(firstLedgerToDelete, entry.getKey());
+        });
+
+        // Currently, ledger deletion is async and failed deletion
+        // does not actually fail truncation but logs an exception
+        // and creates scheduled task to retry
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            LedgerMetadata meta = bookKeeper
+                    .getLedgerMetadata(firstLedgerToDelete)
+                    .exceptionally(e -> null)
+                    .get();
+            assertEquals(null, meta, "ledger should be deleted " + firstLedgerToDelete);
+            });
+
+        // Should not throw, deleting absent ledger must be a noop
+        // unless PulsarManager returned a wrong error which
+        // got translated to BKUnexpectedConditionException
+        try {
+            bookKeeper.deleteLedger(firstLedgerToDelete);
+        } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bke) {
+            // pass
+        }

Review Comment:
   ```suggestion
           try {
               bookKeeper.deleteLedger(firstLedgerToDelete);
               fail();
           } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bke) {
               // pass
           }
   ```
   
   Please use `fail()` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org