You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/06/22 01:47:03 UTC

[pulsar] branch master updated: [fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (#16154)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 33cf2d09502 [fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (#16154)
33cf2d09502 is described below

commit 33cf2d09502cec160dcf637786dd5b8fb5669343
Author: Zike Yang <zi...@apache.org>
AuthorDate: Wed Jun 22 09:46:55 2022 +0800

    [fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (#16154)
    
    ### Motivation
    
    This is the same problem as when the consumer inclusive seeks the chunked message.
    
    See more detail in [PIP-107](https://github.com/apache/pulsar/issues/12402)
    
    ### Modifications
    
    * Use the first chunk message id as the startMessageId when creating the consumer/reader.
---
 .../org/apache/pulsar/client/impl/MessageChunkingTest.java    | 11 ++++++++++-
 .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java |  9 ++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 85d67c3de0d..00e6c2f78e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -520,6 +520,15 @@ public class MessageChunkingTest extends ProducerConsumerBase {
             assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
         }
 
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topicName)
+                .startMessageIdInclusive()
+                .startMessageId(msgIds.get(1))
+                .create();
+
+        Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
+        assertEquals(msgIds.get(1), readMsg.getMessageId());
+
         consumer1.close();
         consumer2.close();
         producer.close();
@@ -549,5 +558,5 @@ public class MessageChunkingTest extends ProducerConsumerBase {
         }
         return str.toString();
     }
- 
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9e86770ee8f..3e25ba0facb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -249,7 +249,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = conf.getSubscriptionMode();
-        this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
+        if (startMessageId != null) {
+            if (startMessageId instanceof ChunkMessageIdImpl) {
+                this.startMessageId = new BatchMessageIdImpl(
+                        ((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId());
+            } else {
+                this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId);
+            }
+        }
         this.initialStartMessageId = this.startMessageId;
         this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
         AVAILABLE_PERMITS_UPDATER.set(this, 0);