You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:45:05 UTC

[pulsar] 14/17: [C++] Subscription InitialPosition is not correctly set on regex consumers (#6810)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4095bdf556494684c890813af3ab8806f2ac9405
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue May 12 00:28:40 2020 -0700

    [C++] Subscription InitialPosition is not correctly set on regex consumers (#6810)
    
    ### Motivation
    
    The subscription `InitialPosition` is not currently set when using the multi-topic or regex consumers in C++/Python.
    
    That makes that if you try to start from `MessageId::earliest`, it would be ignored.
    
    * [C++] Subscription InitialPosition is not correctly set on regex consumers
    
    * fix test fail for topic name
    
    * fix `make format`
    
    Co-authored-by: Jia Zhai <zh...@apache.org>(cherry picked from commit 0b803a83fbbe873020cdf18f8259ea92584492ec)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  6 +---
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 36 ++++++++++++++++++++++++
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index e837adf..34a0c90 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -157,13 +157,9 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     }
 
     std::shared_ptr<ConsumerImpl> consumer;
-    ConsumerConfiguration config;
+    ConsumerConfiguration config = conf_.clone();
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
 
-    // all the consumers should have same name.
-    config.setConsumerName(conf_.getConsumerName());
-    config.setConsumerType(conf_.getConsumerType());
-    config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
     config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
                                         std::placeholders::_1, std::placeholders::_2));
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 5277ca4..e99f408 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3026,6 +3026,42 @@ TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
     }
 }
 
+TEST(BasicEndToEndTest, testRegexTopicsWithInitialPosition) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string topicName =
+        "persistent://public/default/test-regex-initial-position-" + std::to_string(time(NULL));
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    for (int i = 0; i < 10; i++) {
+        producer.send(MessageBuilder().setContent("test-" + std::to_string(i)).build());
+    }
+
+    std::string subsName = "testRegexTopicsWithMessageListener-sub";
+    std::string pattern = topicName + ".*";
+
+    // Subscription gets created after messages are produced but it will start from the beginning of the topic
+    ConsumerConfiguration consumerConf;
+    consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
+
+    Consumer consumer;
+    result = client.subscribeWithRegex(pattern, subsName, consumerConf, consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subsName);
+
+    for (int i = 0; i < 10; i++) {
+        Message msg;
+        Result res = consumer.receive(msg);
+        ASSERT_EQ(ResultOk, result);
+    }
+
+    client.close();
+}
+
 TEST(BasicEndToEndTest, testPartitionedTopicWithOnePartition) {
     ClientConfiguration config;
     Client client(lookupUrl);