You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/03/06 21:36:43 UTC

(pulsar) branch branch-3.0 updated: [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) (#22142)

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5b62d4b221f [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) (#22142)
5b62d4b221f is described below

commit 5b62d4b221fb0686c93911c13998fe3ad2689689
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Mar 7 05:36:36 2024 +0800

    [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) (#22142)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |   4 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  10 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  12 +-
 .../broker/service/persistent/PersistentTopic.java |   5 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |   6 +-
 .../broker/service/ReplicatorSubscriptionTest.java |   2 +
 .../pulsar/broker/transaction/TransactionTest.java |   1 +
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  28 ++++
 .../apache/pulsar/compaction/CompactionTest.java   | 164 ++++++++++++++++++++-
 9 files changed, 218 insertions(+), 14 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1ffdf6d2d7..bc6a1e9a782 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -517,6 +517,10 @@ public interface ManagedCursor {
      */
     void rewind();
 
+    default void rewind(boolean readCompacted) {
+        rewind();
+    }
+
     /**
      * Move the cursor to a different read position.
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 10e72f709fe..7fd93dacf49 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -677,7 +677,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                                  LedgerHandle recoveredFromCursorLedger) {
         // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
         // we need to move to the next existing ledger
-        if (!ledger.ledgerExists(position.getLedgerId())) {
+        if (position.getEntryId() == -1L && !ledger.ledgerExists(position.getLedgerId())) {
             Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
             if (nextExistingLedger == null) {
                 log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
@@ -2518,9 +2518,15 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public void rewind() {
+        rewind(false);
+    }
+
+    @Override
+    public void rewind(boolean readCompacted) {
         lock.writeLock().lock();
         try {
-            PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
+            PositionImpl newReadPosition =
+                    readCompacted ? markDeletePosition.getNext() : ledger.getNextValidPosition(markDeletePosition);
             PositionImpl oldReadPosition = readPosition;
 
             log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index eacc568f0a4..bf6482bda01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -105,9 +105,9 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Rewind cursor and read more entries without delay", name);
             }
-            cursor.rewind();
-
             Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+
             notifyActiveConsumerChanged(activeConsumer);
             readMoreEntries(activeConsumer);
             return;
@@ -125,9 +125,9 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
                         serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
             }
-            cursor.rewind();
-
             Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+
             notifyActiveConsumerChanged(activeConsumer);
             readMoreEntries(activeConsumer);
             readOnActiveConsumerTask = null;
@@ -198,7 +198,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 log.debug("[{}] rewind because no available consumer found", name);
             }
             entries.forEach(Entry::release);
-            cursor.rewind();
+            cursor.rewind(currentConsumer != null ? currentConsumer.readCompacted() : readConsumer.readCompacted());
             if (currentConsumer != null) {
                 notifyActiveConsumerChanged(currentConsumer);
                 readMoreEntries(currentConsumer);
@@ -293,7 +293,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         }
         cursor.cancelPendingReadRequest();
         havePendingRead = false;
-        cursor.rewind();
+        cursor.rewind(consumer.readCompacted());
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e694e063774..24a223fe82a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -977,7 +977,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
-            InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
+                                                                   InitialPosition initialPosition,
+                                                                   long startMessageRollbackDurationSec,
+                                                                   boolean replicated,
                                                                    Map<String, String> subscriptionProperties) {
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -987,7 +989,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
 
         Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
-
         ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
                 new OpenCursorCallback() {
             @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 22520654135..b9066530998 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -96,7 +96,11 @@ public class CompactedTopicImpl implements CompactedTopic {
                                        ReadEntriesCallback callback, Consumer consumer) {
         synchronized (this) {
             PositionImpl cursorPosition;
-            if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
+            boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
+                && (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
+                || cursor.getMarkDeletedPosition() == null
+                || cursor.getMarkDeletedPosition().getEntryId() == -1L);
+            if (readFromEarliest){
                 cursorPosition = PositionImpl.EARLIEST;
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index fe519827be7..4cc3a9ada7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -868,6 +869,7 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
                 .topic(topicName)
                 .subscriptionName("sub2")
                 .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .readCompacted(true)
                 .subscribe();
         List<String> result = new ArrayList<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c947ba27069..1ad5ea14e0a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1877,6 +1877,7 @@ public class TransactionTest extends TransactionTestBase {
                 .topic(topic)
                 .subscriptionName("sub")
                 .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .readCompacted(true)
                 .subscribe();
         List<String> result = new ArrayList<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 951f99af1a4..2f91d792581 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -733,4 +733,32 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         admin.topics().deletePartitionedTopic(partitionedTopic);
     }
 
+    @Test
+    public void testReaderReconnectedFromNextEntry() throws Exception {
+        final String topic = "persistent://my-property/my-ns/testReaderReconnectedFromNextEntry";
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+                .startMessageId(MessageId.earliest).create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+        // Send 3 and consume 1.
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg1.getValue(), "1");
+
+        // Trigger reader reconnect.
+        admin.topics().unload(topic);
+
+        // For non-durable we are going to restart from the next entry.
+        Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg2.getValue(), "2");
+        Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+        assertEquals(msg3.getValue(), "3");
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 545dd97675c..d3de2187d95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1878,6 +1878,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
 
         ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
                 .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
         //Give some time to consume
@@ -1915,6 +1916,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
 
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
                 .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
@@ -2175,9 +2177,11 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         });
 
         @Cleanup
-        Consumer<String> consumer =
-                pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
-                        .subscribe();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionName("sub-2")
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
 
         List<String> result = new ArrayList<>();
         while (true) {
@@ -2191,4 +2195,158 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
 
         Assert.assertEquals(result, List.of("V3", "V4", "V5"));
     }
+
+    @Test
+    public void testAcknowledgeWithReconnection() throws Exception {
+        final String topicName = "persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
+        final String subName = "my-sub";
+        @Cleanup
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+            expected.add(String.valueOf(i));
+        }
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        // trim the topic
+        admin.topics().unload(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
+            assertEquals(internalStats.numberOfEntries, 0);
+        });
+
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
+                .topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+
+        List<String> results = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            results.add(message.getValue());
+            consumer.acknowledge(message);
+        }
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
+                        5));
+
+        // Make consumer reconnect to broker
+        admin.topics().unload(topicName);
+
+        // Wait for consumer to reconnect and clear incomingMessages
+        consumer.pause();
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(consumer.numMessagesInQueue(), 0);
+        });
+        consumer.resume();
+
+        for (int i = 0; i < 5; i++) {
+            Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            results.add(message.getValue());
+            consumer.acknowledge(message);
+        }
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
+                        0));
+
+        Assert.assertEquals(results, expected);
+
+        Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        // Make consumer reconnect to broker
+        admin.topics().unload(topicName);
+
+        producer.newMessage().key("K").value("V").send();
+        Message<String> message2 = consumer.receive(3, TimeUnit.SECONDS);
+        Assert.assertEquals(message2.getValue(), "V");
+        consumer.acknowledge(message2);
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
+            Assert.assertEquals(internalStats.lastConfirmedEntry,
+                    internalStats.cursors.get(subName).markDeletePosition);
+        });
+
+        consumer.close();
+        producer.close();
+    }
+
+    @Test
+    public void testEarliestSubsAfterRollover() throws Exception {
+        final String topicName = "persistent://my-property/use/my-ns/testEarliestSubsAfterRollover" + UUID.randomUUID();
+        final String subName = "my-sub";
+        @Cleanup
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topicName).create();
+
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+            expected.add(String.valueOf(i));
+        }
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        // trim the topic
+        admin.topics().unload(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
+            assertEquals(internalStats.numberOfEntries, 0);
+        });
+
+        // Make ml.getFirstPosition() return new ledger first position
+        producer.newMessage().key("K").value("V").send();
+        expected.add("V");
+
+        @Cleanup
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
+                .topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+
+        List<String> results = new ArrayList<>();
+        while (true) {
+            Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+
+            results.add(message.getValue());
+            consumer.acknowledge(message);
+        }
+
+        Assert.assertEquals(results, expected);
+    }
 }