You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/05 11:26:55 UTC

[pulsar] 02/03: Fix data lost when using earliest position to subscribe to a topic (#11547)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fdc5eab8eb285922cd86fab3b5b998e815b22d72
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Aug 4 08:19:20 2021 +0800

    Fix data lost when using earliest position to subscribe to a topic (#11547)
    
    When subscribing to a topic with earliest position, the ManagedLedger always using
    the last position to init the cursor. If the no cursor update happens and the broker restarts
    or topic been unloaded or the topic ownership changed, will lead to the data lost, the unacked messages
    will not redeliver to the consumer again.
    
    The root cause is if we are using the last position to init the cursor, the cursor will update the
    mark delete position as the last position first to the Zookeeper, if the cursor can't a chance to
    update the mark delete position again before been closed, when recoving the cursor again, will using
    the mark delete posiion that stored in the Zookeeper, so the issue happens.
    
    The fix is to add check for the initial position of the cursor, if we are using the Earliest as the initial position,
    use the first position to init the cursor.
    
    The new added test can cover the changes, and without this change, the test would failed.
    
    (cherry picked from commit 035a6bab7af8ed17f811c16b518dc02eea2435a1)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 54 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0e4a82c..5a5fe49 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -791,7 +791,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName);
         CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
         uninitializedCursors.put(cursorName, cursorFuture);
-        cursor.initialize(getLastPosition(), properties, new VoidCallback() {
+        PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
+        cursor.initialize(position, properties, new VoidCallback() {
             @Override
             public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index e828598..f594d4e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import lombok.Cleanup;
 
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -36,6 +38,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Sets;
 
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertEquals;
 
@@ -179,4 +182,55 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
         consumer.close();
     }
 
+    @Test(timeOut = 30000)
+    public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exception {
+
+        final String subName = "my-subscriber-name";
+        final String topicName = "testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID();
+        final int messages = 100;
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        List<CompletableFuture<MessageId>> sendResults = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            sendResults.add(producer.sendAsync("Hello - " + i));
+        }
+        producer.flush();
+
+        FutureUtil.waitForAll(sendResults).get();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        List<Message<String>> received = new ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            received.add(consumer.receive());
+        }
+
+        assertEquals(received.size(), messages);
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        admin.topics().unload(topicName);
+
+        // The consumer does not ack any messages, so after unloading the topic,
+        // the consumer should get the unacked messages again
+
+        received.clear();
+        for (int i = 0; i < messages; i++) {
+            received.add(consumer.receive());
+        }
+
+        assertEquals(received.size(), messages);
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        consumer.close();
+        producer.close();
+    }
 }