You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/15 06:58:15 UTC
[pulsar] branch master updated: [cpp client] Bugfix prevent dup
consumer for same topic subscription (#3748)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fff02e2 [cpp client] Bugfix prevent dup consumer for same topic subscription (#3748)
fff02e2 is described below
commit fff02e2aa2064412dbae18b973eb2bb2abab25d8
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Fri Mar 15 03:58:10 2019 -0300
[cpp client] Bugfix prevent dup consumer for same topic subscription (#3748)
Same fix as #3746 but for cpp client.
- Filter consumers for the same topic name.
- Add test to verify that same subscription names with different topics are
allowed to be different consumers subscription instead of reused.
---
pulsar-client-cpp/lib/ClientImpl.cc | 3 ++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 32 +++++++++++++++++++++++++++-
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 7cc818b..0a8aaa6 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -344,7 +344,8 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
ConsumersList consumers(consumers_);
for (auto& weakPtr : consumers) {
ConsumerImplBasePtr consumer = weakPtr.lock();
- if (consumer && consumer->getSubscriptionName() == consumerName && !consumer->isClosed()) {
+ if (consumer && consumer->getSubscriptionName() == consumerName &&
+ consumer->getTopic() == topic && !consumer->isClosed()) {
lock.unlock();
LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
callback(ResultOk, Consumer(consumer));
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 52c2800..9ce89f7 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2909,4 +2909,34 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
ASSERT_FALSE(res != 204 && res != 409);
testNegativeAcks(topicName, true);
-}
\ No newline at end of file
+}
+
+TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string subsName = "my-only-sub";
+ std::string topicName =
+ "persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics";
+ ConsumerConfiguration consumerConf;
+ consumerConf.setConsumerType(ConsumerShared);
+
+ Consumer consumerA;
+ Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
+ ASSERT_EQ(ResultOk, resultA);
+ ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
+
+ Consumer consumerB;
+ Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
+ ASSERT_EQ(ResultOk, resultB);
+ ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+
+ Consumer consumerC;
+ Result resultC = client.subscribe(topicName + "-different-topic", subsName, consumerConf, consumerC);
+ ASSERT_EQ(ResultOk, resultB);
+ ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+ ASSERT_EQ(ResultOk, consumerA.close());
+ ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
+
+ // consumer C should be a different instance from A and B and should be with open state.
+ ASSERT_EQ(ResultOk, consumerC.close());
+}