You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 07:28:54 UTC
[pulsar] branch master updated: [C++] Subscription InitialPosition
is not correctly set on regex consumers (#6810)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0b803a8 [C++] Subscription InitialPosition is not correctly set on regex consumers (#6810)
0b803a8 is described below
commit 0b803a83fbbe873020cdf18f8259ea92584492ec
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue May 12 00:28:40 2020 -0700
[C++] Subscription InitialPosition is not correctly set on regex consumers (#6810)
### Motivation
The subscription `InitialPosition` is not currently set when using the multi-topic or regex consumers in C++/Python.
That makes that if you try to start from `MessageId::earliest`, it would be ignored.
* [C++] Subscription InitialPosition is not correctly set on regex consumers
* fix test fail for topic name
* fix `make format`
Co-authored-by: Jia Zhai <zh...@apache.org>
---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 6 +---
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 36 ++++++++++++++++++++++++
2 files changed, 37 insertions(+), 5 deletions(-)
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index e7102f7..6168a14 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -157,13 +157,9 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
}
std::shared_ptr<ConsumerImpl> consumer;
- ConsumerConfiguration config;
+ ConsumerConfiguration config = conf_.clone();
ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
- // all the consumers should have same name.
- config.setConsumerName(conf_.getConsumerName());
- config.setConsumerType(conf_.getConsumerType());
- config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 26bdee5..b27d0c6 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3033,6 +3033,42 @@ TEST(BasicEndToEndTest, testRegexTopicsWithMessageListener) {
}
}
+TEST(BasicEndToEndTest, testRegexTopicsWithInitialPosition) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+
+ std::string topicName =
+ "persistent://public/default/test-regex-initial-position-" + std::to_string(time(NULL));
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(MessageBuilder().setContent("test-" + std::to_string(i)).build());
+ }
+
+ std::string subsName = "testRegexTopicsWithMessageListener-sub";
+ std::string pattern = topicName + ".*";
+
+ // Subscription gets created after messages are produced but it will start from the beginning of the topic
+ ConsumerConfiguration consumerConf;
+ consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
+
+ Consumer consumer;
+ result = client.subscribeWithRegex(pattern, subsName, consumerConf, consumer);
+ ASSERT_EQ(ResultOk, result);
+ ASSERT_EQ(consumer.getSubscriptionName(), subsName);
+
+ for (int i = 0; i < 10; i++) {
+ Message msg;
+ Result res = consumer.receive(msg);
+ ASSERT_EQ(ResultOk, result);
+ }
+
+ client.close();
+}
+
TEST(BasicEndToEndTest, testPartitionedTopicWithOnePartition) {
ClientConfiguration config;
Client client(lookupUrl);