You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/10/08 14:19:18 UTC

[pulsar] branch master updated: [fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers (#21250)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ca779827581 [fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers (#21250)
ca779827581 is described below

commit ca77982758170993aa52c0f7f45bbf9ad72e368a
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sun Oct 8 22:19:12 2023 +0800

    [fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers (#21250)
    
    ### Background
    - But after trimming ledgers, `ml.lastConfirmedPosition` relies on a deleted ledger when the current ledger of ML is empty.
    - Cursor prevents setting `markDeletedPosition` to a value larger than `ml.lastConfirmedPosition`, but there are no entries to read<sup>[1]</sup>.
    - The code description in the method `advanceCursors` said: do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition`<sup>[2]</sup>
    
    ### Issue
    If there is no durable cursor, the `markDeletedPosition` might be set to `{current_ledger, -1}`, and `async mark delete` will be prevented by the `rule-2` above. So he `backlog`, `readPosition`, and `markDeletedPosition` of the cursor will be in an incorrect position after trimming the ledger. You can reproduce it by the test `testTrimLedgerIfNoDurableCursor`
    
    ### Modifications
    Do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition` when advancing non-durable cursors.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  28 +++--
 .../client/api/NonDurableSubscriptionTest.java     | 115 ++++++++++++++++++++-
 2 files changed, 133 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d51b48bdda5..e011bf3e6d7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2850,15 +2850,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             return;
         }
 
-        // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
-        // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
-        // incorrect results
-        Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
-        if (firstNonDeletedLedger == null) {
-            throw new LedgerNotExistException("First non deleted Ledger is not found");
+        // Just ack messages like a consumer. Normally, consumers will not confirm a position that does not exist, so
+        // find the latest existing position to ack.
+        PositionImpl highestPositionToDelete = calculateLastEntryInLedgerList(ledgersToDelete);
+        if (highestPositionToDelete == null) {
+            log.warn("[{}] The ledgers to be trim are all empty, skip to advance non-durable cursors: {}",
+                    name, ledgersToDelete);
+            return;
         }
-        PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
-
         cursors.forEach(cursor -> {
             // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
             // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be
@@ -2882,6 +2881,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         });
     }
 
+    /**
+     * @return null if all ledgers is empty.
+     */
+    private PositionImpl calculateLastEntryInLedgerList(List<LedgerInfo> ledgersToDelete) {
+        for (int i = ledgersToDelete.size() - 1; i >= 0; i--) {
+            LedgerInfo ledgerInfo = ledgersToDelete.get(i);
+            if (ledgerInfo != null && ledgerInfo.hasEntries() && ledgerInfo.getEntries() > 0) {
+                return PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+            }
+        }
+        return null;
+    }
+
     /**
      * Delete this ManagedLedger completely from the system.
      *
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index b5d00ac012a..20407295ccb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -33,6 +33,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
@@ -55,7 +56,7 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
 @Slf4j
-public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
+public class NonDurableSubscriptionTest extends ProducerConsumerBase {
 
     private final AtomicInteger numFlow = new AtomicInteger(0);
 
@@ -316,7 +317,7 @@ public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
     }
 
     @Test
-    public void testTrimLedgerIfNoDurableCursor() throws Exception {
+    public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception {
         final String nonDurableCursor = "non-durable-cursor";
         final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
         Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
@@ -557,4 +558,114 @@ public class NonDurableSubscriptionTest  extends ProducerConsumerBase {
         producer.close();
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testTrimLedgerIfNoDurableCursor() throws Exception {
+        final String nonDurableCursor = "non-durable-cursor";
+        final String durableCursor = "durable-cursor";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topicName);
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                .subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                .subscriptionName(durableCursor).subscribe();
+        consumer.close();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        producer.send("4");
+        MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5");
+
+        Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg1.getValue(), "1");
+        Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg2.getValue(), "2");
+        Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg3.getValue(), "3");
+
+        // Unsubscribe durable cursor.
+        // Trigger a trim ledgers task, and verify trim ledgers successful.
+        admin.topics().unload(topicName);
+        Thread.sleep(3 * 1000);
+        admin.topics().deleteSubscription(topicName, durableCursor);
+        // Trim ledgers after release durable cursor.
+        trimLedgers(topicName);
+        List<ManagedLedgerInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topicName).ledgers;
+        assertEquals(ledgers.size(), 1);
+        assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId());
+
+        // Verify backlog and markDeletePosition is correct.
+        Awaitility.await().untilAsserted(() -> {
+            SubscriptionStats subscriptionStats = admin.topics().getStats(topicName, true, true, true)
+                    .getSubscriptions().get(nonDurableCursor);
+            log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
+            assertEquals(subscriptionStats.getMsgBacklog(), 0);
+            ManagedLedgerInternalStats.CursorStats cursorStats =
+                    admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+            String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
+            PositionImpl actMarkDeletedPos =
+                    PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
+            PositionImpl expectedMarkDeletedPos =
+                    PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
+            log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
+            log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
+            Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
+        });
+
+        // Clear the incoming queue of the reader for next test.
+        while (true) {
+            Message<String> msg = reader.readNext(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            log.info("clear msg: {}", msg.getValue());
+        }
+
+        // The following tests are designed to verify the api "getNumberOfEntries" and "consumedEntries" still work
+        // after changes.See the code-description added with the PR https://github.com/apache/pulsar/pull/10667.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(nonDurableCursor);
+
+        // Verify "getNumberOfEntries" if there is no entries to consume.
+        assertEquals(0, cursor.getNumberOfEntries());
+        assertEquals(0, ml.getNumberOfEntries());
+
+        // Verify "getNumberOfEntries" if there is 1 entry to consume.
+        producer.send("6");
+        producer.send("7");
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(2, ml.getNumberOfEntries());
+            // Since there is one message has been pulled into the incoming queue of reader. There is only one messages
+            // waiting to cursor read.
+            assertEquals(1, cursor.getNumberOfEntries());
+        });
+
+        // Verify "consumedEntries" is correct.
+        ManagedLedgerInternalStats.CursorStats cursorStats =
+                admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+        // "messagesConsumedCounter" should be 0 after unload the topic.
+        // Note: "topic_internal_stat.cursor.messagesConsumedCounter" means how many messages were acked on this
+        //   cursor. The similar one "topic_stats.lastConsumedTimestamp" means the last time of sending messages to
+        //   the consumer.
+        assertEquals(0, cursorStats.messagesConsumedCounter);
+        Message<String> msg6 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg6.getValue(), "6");
+        Message<String> msg7 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg7.getValue(), "7");
+        Awaitility.await().untilAsserted(() -> {
+            // "messagesConsumedCounter" should be 2 after consumed 2 message.
+            ManagedLedgerInternalStats.CursorStats cStat =
+                    admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+            assertEquals(2, cStat.messagesConsumedCounter);
+        });
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
 }