You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/15 06:58:15 UTC

[pulsar] branch master updated: [cpp client] Bugfix prevent dup consumer for same topic subscription (#3748)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fff02e2  [cpp client] Bugfix prevent dup consumer for same topic subscription (#3748)
fff02e2 is described below

commit fff02e2aa2064412dbae18b973eb2bb2abab25d8
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Fri Mar 15 03:58:10 2019 -0300

    [cpp client] Bugfix prevent dup consumer for same topic subscription (#3748)
    
    Same fix as #3746 but for cpp client.
    
      - Filter consumers for the same topic name.
      - Add test to verify that same subscription names with different topics are
        allowed to be different consumers subscription instead of reused.
---
 pulsar-client-cpp/lib/ClientImpl.cc          |  3 ++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc | 32 +++++++++++++++++++++++++++-
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 7cc818b..0a8aaa6 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -344,7 +344,8 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
             ConsumersList consumers(consumers_);
             for (auto& weakPtr : consumers) {
                 ConsumerImplBasePtr consumer = weakPtr.lock();
-                if (consumer && consumer->getSubscriptionName() == consumerName && !consumer->isClosed()) {
+                if (consumer && consumer->getSubscriptionName() == consumerName &&
+                    consumer->getTopic() == topic && !consumer->isClosed()) {
                     lock.unlock();
                     LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
                     callback(ResultOk, Consumer(consumer));
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 52c2800..9ce89f7 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2909,4 +2909,34 @@ TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
     ASSERT_FALSE(res != 204 && res != 409);
 
     testNegativeAcks(topicName, true);
-}
\ No newline at end of file
+}
+
+TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string subsName = "my-only-sub";
+    std::string topicName =
+        "persistent://public/default/testPreventDupConsumersAllowSameSubForDifferentTopics";
+    ConsumerConfiguration consumerConf;
+    consumerConf.setConsumerType(ConsumerShared);
+
+    Consumer consumerA;
+    Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
+    ASSERT_EQ(ResultOk, resultA);
+    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
+
+    Consumer consumerB;
+    Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
+    ASSERT_EQ(ResultOk, resultB);
+    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+
+    Consumer consumerC;
+    Result resultC = client.subscribe(topicName + "-different-topic", subsName, consumerConf, consumerC);
+    ASSERT_EQ(ResultOk, resultB);
+    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+    ASSERT_EQ(ResultOk, consumerA.close());
+    ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
+
+    // consumer C should be a different instance from A and B and should be with open state.
+    ASSERT_EQ(ResultOk, consumerC.close());
+}