You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/27 03:40:09 UTC
[pulsar] branch master updated: [Issue #3226][cpp client] Prevent
dup consumers on same client cnx (#3403)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 762e0ab [Issue #3226][cpp client] Prevent dup consumers on same client cnx (#3403)
762e0ab is described below
commit 762e0ab9a52e2665a106766c211d45474adc833b
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Sun Jan 27 00:40:03 2019 -0300
[Issue #3226][cpp client] Prevent dup consumers on same client cnx (#3403)
Prevent same consumer subscription over the same client connection for cpp
client. For more details please see explanation on commit 44e1a23.
### Motivation
Fix #3226 for cpp client.
### Modifications
Add check for `ClientImpl.cc` to know whether new shared consumer subscribe is already present
on consumers vector or not.
For further explanation please see #3312
### Verifying this change
This change added tests and can be verified as follows:
- Add unit test asserting if duplicated consumers are allowed for shared
subscription.
- Add test preventing broker metadata error by closing a duplicate consumer
over the same connection.
---
pulsar-client-cpp/lib/ClientImpl.cc | 10 ++++++
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 47 ++++++++++++++++++++++++++++
2 files changed, 57 insertions(+)
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 1932182..8fa9120 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -343,6 +343,16 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
lock.unlock();
callback(ResultInvalidConfiguration, Consumer());
return;
+ } else if (conf.getConsumerType() == ConsumerShared) {
+ ConsumersList consumers(consumers_);
+ for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
+ ConsumerImplBasePtr consumer = it->lock();
+ if (consumer && consumer->getSubscriptionName() == consumerName && !consumer->isClosed()) {
+ lock.unlock();
+ callback(ResultOk, Consumer(consumer));
+ return;
+ }
+ }
}
}
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 9a2af10..ffd5df4 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2772,3 +2772,50 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
ASSERT_EQ(count, 0);
client.shutdown();
}
+
+TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string subsName = "my-only-sub";
+ std::string topicName = "persistent://public/default/test-prevent-dup-consumers";
+ ConsumerConfiguration consumerConf;
+ consumerConf.setConsumerType(ConsumerShared);
+
+ Consumer consumerA;
+ Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
+ ASSERT_EQ(ResultOk, resultA);
+ ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
+
+ Consumer consumerB;
+ Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
+ ASSERT_EQ(ResultOk, resultB);
+ ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+
+ // Since this is a shared consumer over same client cnx
+ // closing consumerA should result in consumerB also being closed.
+ ASSERT_EQ(ResultOk, consumerA.close());
+ ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
+}
+
+TEST(BasicEndToEndTest, testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string subsName = "my-only-sub";
+ std::string topicName = "persistent://public/default/test-prevent-dup-consumers";
+ ConsumerConfiguration consumerConf;
+ consumerConf.setConsumerType(ConsumerShared);
+
+ Consumer consumerA;
+ Result resultA = client.subscribe(topicName, subsName, consumerConf, consumerA);
+ ASSERT_EQ(ResultOk, resultA);
+ ASSERT_EQ(consumerA.getSubscriptionName(), subsName);
+
+ Consumer consumerB;
+ Result resultB = client.subscribe(topicName, subsName, consumerConf, consumerB);
+ ASSERT_EQ(ResultOk, resultB);
+ ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
+
+ ASSERT_EQ(ResultOk, consumerA.unsubscribe());
+ // If dup consumers are allowed BrokerMetadataError will be the result of close()
+ ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
+}