You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 11:53:30 UTC

[pulsar] 09/09: [ISSUE-12291][Client] 'StartMessageId' and 'RollbackDuration' not working in MultiTopicsReader for non-partitioned topics (#12308)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ce8f4505cfc799a74d15f786542cf9639ef33393
Author: Jason918 <ja...@qq.com>
AuthorDate: Thu Nov 4 23:18:10 2021 +0800

    [ISSUE-12291][Client]  'StartMessageId' and 'RollbackDuration' not working in MultiTopicsReader for non-partitioned topics (#12308)
    
    Fixes #12291
    
    ### Motivation
    
    Bug fix. 'StartMessageId' and 'RollbackDuration' is not working in MultiTopicsReader for non-partitioned topics.
    
    ### Modifications
    
    This fix is quite simple. Just add `startMessageId` and `startMessageRollbackDurationInSec` when creating underlying consumer with `ConsumerImpl.newConsumerImpl`
    
    (cherry picked from commit cb48152254b8c16596e7251ef9a7229d918d2e90)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 102 ++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   8 +-
 3 files changed, 106 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 31a426e..a8a6ced 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -48,7 +48,6 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -378,6 +377,107 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
         Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
     }
 
+    @Test(timeOut = 20000)
+    public void testMultiNonPartitionedTopicWithStartMessageId() throws Exception {
+        final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID();
+        final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+        List<String> topics = Arrays.asList(topic1, topic2);
+        PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+
+        // create producer and send msg
+        List<Producer<String>> producerList = new ArrayList<>();
+        for (String topicName : topics) {
+            producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+        }
+        int msgNum = 10;
+        Set<String> messages = new HashSet<>();
+        for (int i = 0; i < producerList.size(); i++) {
+            Producer<String> producer = producerList.get(i);
+            for (int j = 0; j < msgNum; j++) {
+                String msg = i + "msg" + j;
+                producer.send(msg);
+                messages.add(msg);
+            }
+        }
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .startMessageId(MessageId.earliest)
+                .topics(topics).readerName("my-reader").create();
+        // receive messages
+        while (reader.hasMessageAvailable()) {
+            messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue());
+        }
+        assertEquals(messages.size(), 0);
+        assertEquals(client.consumersCount(), 1);
+        // clean up
+        for (Producer<String> producer : producerList) {
+            producer.close();
+        }
+        reader.close();
+        Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+    }
+
+    @Test(timeOut = 20000)
+    public void testMultiNonPartitionedTopicWithRollbackDuration() throws Exception {
+        final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID();
+        final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+        List<String> topics = Arrays.asList(topic1, topic2);
+        PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+
+        // create producer and send msg
+        List<Producer<String>> producerList = new ArrayList<>();
+        for (String topicName : topics) {
+            producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+        }
+        int totalMsg = 10;
+        Set<String> messages = new HashSet<>();
+        long oldMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5); // 5 hours old
+        long newMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // 5 hours old
+        for (int i = 0; i < producerList.size(); i++) {
+            Producer<String> producer = producerList.get(i);
+            // (1) Publish 10 messages with publish-time 5 HOUR back
+            for (int j = 0; j < totalMsg; j++) {
+                TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage()
+                        .value(i + "-old-msg-" + j);
+                msg.getMetadataBuilder()
+                        .setPublishTime(oldMsgPublishTime)
+                        .setProducerName(producer.getProducerName())
+                        .setReplicatedFrom("us-west1");
+                msg.send();
+                messages.add(msg.getMessage().getValue());
+            }
+            // (2) Publish 10 messages with publish-time 1 HOUR back
+            for (int j = 0; j < totalMsg; j++) {
+                TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage()
+                        .value(i + "-new-msg-" + j);
+                msg.getMetadataBuilder()
+                        .setPublishTime(newMsgPublishTime)
+                        .setProducerName(producer.getProducerName())
+                        .setReplicatedFrom("us-west1");
+                msg.send();
+                messages.add(msg.getMessage().getValue());
+            }
+        }
+
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .startMessageFromRollbackDuration(2, TimeUnit.HOURS)
+                .topics(topics).readerName("my-reader").create();
+        // receive messages
+        while (reader.hasMessageAvailable()) {
+            messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue());
+        }
+        assertEquals(messages.size(), 2 * totalMsg);
+        for (String message : messages) {
+            assertTrue(message.contains("old-msg"));
+        }
+        assertEquals(client.consumersCount(), 1);
+        // clean up
+        for (Producer<String> producer : producerList) {
+            producer.close();
+        }
+        reader.close();
+        Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+    }
+
     @Test(timeOut = 10000)
     public void testKeyHashRangeReader() throws Exception {
         final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b29d6c7..88b6df1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2000,7 +2000,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         if (lastDequeuedMessageId == MessageId.earliest) {
             // if we are starting from latest, we should seek to the actual last message first.
             // allow the last one to be read when read head inclusively.
-            if (startMessageId.equals(MessageId.latest)) {
+            if (MessageId.latest.equals(startMessageId)) {
 
                 CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync();
                 // if the consumer is configured to read inclusive then we need to seek to the last message
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fa7c4a2..18bd7bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -998,8 +998,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 } else {
                     ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                             client.externalExecutorProvider(), -1,
-                            true, subFuture, null, schema, interceptors,
-                            createIfDoesNotExist);
+                            true, subFuture, startMessageId, schema, interceptors,
+                            createIfDoesNotExist, startMessageRollbackDurationInSec);
 
                     synchronized (pauseMutex) {
                         if (paused) {
@@ -1294,8 +1294,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
                                 client, partitionName, configurationData,
                                 client.externalExecutorProvider(),
-                                partitionIndex, true, subFuture, null, schema, interceptors,
-                                true /* createTopicIfDoesNotExist */);
+                                partitionIndex, true, subFuture, startMessageId, schema, interceptors,
+                                true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();