You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/27 03:40:09 UTC

[pulsar] branch master updated: [Issue #3226][cpp client] Prevent dup consumers on same client cnx (#3403)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 762e0ab  [Issue #3226][cpp client] Prevent dup consumers on same client cnx (#3403)
762e0ab is described below

commit 762e0ab9a52e2665a106766c211d45474adc833b
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Sun Jan 27 00:40:03 2019 -0300

    [Issue #3226][cpp client] Prevent dup consumers on same client cnx (#3403)
    
    Prevent same consumer subscription over the same client connection for cpp
    client. For more details please see explanation on commit 44e1a23.
    
    ### Motivation
    
    Fix #3226 for cpp client.
    
    ### Modifications
    
    Add check for `ClientImpl.cc` to know whether new shared consumer subscribe is already present
    on consumers vector or not.
    
    For further explanation please see #3312
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    
      - Add unit test asserting if duplicated consumers are allowed for shared
        subscription.
      - Add test preventing broker metadata error by closing a duplicate consumer
        over the same connection.
---
 pulsar-client-cpp/lib/ClientImpl.cc          | 10 ++++++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc | 47 ++++++++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 1932182..8fa9120 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -343,6 +343,16 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
             lock.unlock();
             callback(ResultInvalidConfiguration, Consumer());
             return;
+        } else if (conf.getConsumerType() == ConsumerShared) {
+            ConsumersList consumers(consumers_);
+            for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
+                ConsumerImplBasePtr consumer = it->lock();
+                if (consumer && consumer->getSubscriptionName() == consumerName && !consumer->isClosed()) {
+                    lock.unlock();
+                    callback(ResultOk, Consumer(consumer));
+                    return;
+                }
+            }
         }
     }
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 9a2af10..ffd5df4 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2772,3 +2772,50 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
     ASSERT_EQ(count, 0);
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string subsName = "my-only-sub";
+    std::string topicName = "persistent://public/default/test-prevent-dup-consumers";
+    ConsumerConfiguration consumerConf;
+    consumerConf.setConsumerType(ConsumerShared);
+
+    Consumer consumerA;
+    Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
+    ASSERT_EQ(ResultOk, resultA);
+    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
+
+    Consumer consumerB;
+    Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
+    ASSERT_EQ(ResultOk, resultB);
+    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+
+    // Since this is a shared consumer over same client cnx
+    // closing consumerA should result in consumerB also being closed.
+    ASSERT_EQ(ResultOk, consumerA.close());
+    ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
+}
+
+TEST(BasicEndToEndTest, testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string subsName = "my-only-sub";
+    std::string topicName = "persistent://public/default/test-prevent-dup-consumers";
+    ConsumerConfiguration consumerConf;
+    consumerConf.setConsumerType(ConsumerShared);
+
+    Consumer consumerA;
+    Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
+    ASSERT_EQ(ResultOk, resultA);
+    ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
+
+    Consumer consumerB;
+    Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
+    ASSERT_EQ(ResultOk, resultB);
+    ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+
+    ASSERT_EQ(ResultOk, consumerA.unsubscribe());
+    // If dup consumers are allowed BrokerMetadataError will be the result of close()
+    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
+}