You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:59 UTC

[pulsar] 07/09: Fix seek at batchIndex level receive duplicated messages (#11826)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6763969dac1b9133c9b80130a00fa0f01051b8c4
Author: Aloys <lo...@gmail.com>
AuthorDate: Thu Sep 2 15:46:35 2021 +0800

    Fix seek at batchIndex level receive duplicated messages (#11826)
    
    Fixes #11825
    
    ### Motivation
    If AcknowledgmentAtBatchIndexLevelEnabled, consumer may receive duplicated messages after seek to batchIndex.
    This pull request tries to resolve this problem.
    
    ### Modifications
    Set the `duringSeek` before reveived seek response.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    SubscriptionSeekTest.testSeekForBatchMessageAndSpecifiedBatchIndex
    
    (cherry picked from commit fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2)
---
 .../broker/service/SubscriptionSeekTest.java       | 83 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 ++-
 2 files changed, 91 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 22e7135..cd584ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
@@ -182,6 +183,88 @@ public class SubscriptionSeekTest extends BrokerTestBase {
             assertEquals(receiveId, messageId);
         }
     }
+
+    @Test
+    public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
+        final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
+        String subscriptionName = "my-subscription-batch";
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(true)
+                .batchingMaxMessages(3)
+                // set batch max publish delay big enough to make sure entry has 3 messages
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+                .topic(topicName).create();
+
+
+        List<MessageId> messageIds = new ArrayList<>();
+        List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            String message = "my-message-" + i;
+            messages.add(message);
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+            futureMessageIds.add(messageIdCompletableFuture);
+        }
+
+        for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+            MessageId messageId = futureMessageId.get();
+            messageIds.add(messageId);
+        }
+
+        producer.close();
+
+        assertTrue(messageIds.get(0) instanceof  BatchMessageIdImpl);
+        assertTrue(messageIds.get(1) instanceof  BatchMessageIdImpl);
+        assertTrue(messageIds.get(2) instanceof  BatchMessageIdImpl);
+
+        BatchMessageIdImpl batchMsgId0 = (BatchMessageIdImpl) messageIds.get(0);
+        BatchMessageIdImpl batchMsgId1 = (BatchMessageIdImpl) messageIds.get(1);
+        BatchMessageIdImpl msgIdToSeekFirst = (BatchMessageIdImpl) messageIds.get(2);
+
+        assertEquals(batchMsgId0.getEntryId(), batchMsgId1.getEntryId());
+        assertEquals(batchMsgId1.getEntryId(), msgIdToSeekFirst.getEntryId());
+
+        PulsarClient newPulsarClient = PulsarClient.builder()
+                // set start backoff interval short enough to make sure client will re-connect quickly
+                .startingBackoffInterval(1, TimeUnit.MICROSECONDS)
+                .serviceUrl(lookupUrl.toString())
+                .build();
+
+        org.apache.pulsar.client.api.Consumer<String> consumer = newPulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .startMessageIdInclusive()
+                .subscribe();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+        assertEquals(topicRef.getSubscriptions().size(), 1);
+
+        consumer.seek(msgIdToSeekFirst);
+        MessageId msgId = consumer.receive().getMessageId();
+        assertTrue(msgId instanceof BatchMessageIdImpl);
+        BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
+        assertEquals(batchMsgId, msgIdToSeekFirst);
+
+
+        consumer.seek(MessageId.earliest);
+        Message<String> receiveBeforEarliest = consumer.receive();
+        assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
+        consumer.seek(MessageId.latest);
+        Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
+        assertNull(receiveAfterLatest);
+
+        for (MessageId messageId : messageIds) {
+            consumer.seek(messageId);
+            MessageId receiveId = consumer.receive().getMessageId();
+            assertEquals(receiveId, messageId);
+        }
+
+        newPulsarClient.close();
+    }
+
     @Test
     public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
         final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7fa039..7a20ff4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1802,20 +1802,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
 
-        log.info("[{}][{}] Seek subscription to {}", topic, subscription, seekBy);
+        BatchMessageIdImpl originSeekMessageId = seekMessageId;
+        seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
+        duringSeek.set(true);
+        log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
             log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
             acknowledgmentsGroupingTracker.flushAndClean();
 
-            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
-            duringSeek.set(true);
             lastDequeuedMessageId = MessageId.earliest;
 
             clearIncomingMessages();
             seekFuture.complete(null);
         }).exceptionally(e -> {
+            // re-set duringSeek and seekMessageId if seek failed
+            seekMessageId = originSeekMessageId;
+            duringSeek.set(false);
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+
             seekFuture.completeExceptionally(
                 PulsarClientException.wrap(e.getCause(),
                     String.format("Failed to seek the subscription %s of the topic %s to %s",