You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/27 05:33:52 UTC
[pulsar] branch master updated: [cpp client] implement reference
count for close() (#3863)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee98e8b [cpp client] implement reference count for close() (#3863)
ee98e8b is described below
commit ee98e8b28ce1397c056757fe73115d21320ed883
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Wed Mar 27 02:33:46 2019 -0300
[cpp client] implement reference count for close() (#3863)
Add reference count feature to keep track of reused instances of a consumer
instance, for more details please see commit ff4db8d.
*Modifications*
- Add refCount instance variable on ConsumerImpl.
- Use new safeDecrRefCount() on consumer close() in order to know whether
effective close call should occur or not.
- Increment reference count when a previous built consumer instance is being
used by caller.
*Future considerations*
Thereafter when feature preventing duplicated consumer is made for
PartitionedConsumer, MultiTopicsConsumer and PatternMultiTopicsConsumer,
incrRefCount() member could be turned into a pure virtual method.
---
pulsar-client-cpp/lib/ClientImpl.cc | 1 +
pulsar-client-cpp/lib/ConsumerImpl.cc | 16 ++++++++++++++--
pulsar-client-cpp/lib/ConsumerImpl.h | 3 +++
pulsar-client-cpp/lib/ConsumerImplBase.h | 1 +
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 4 ++++
5 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 0a8aaa6..e2c15e0 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -346,6 +346,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
ConsumerImplBasePtr consumer = weakPtr.lock();
if (consumer && consumer->getSubscriptionName() == consumerName &&
consumer->getTopic() == topic && !consumer->isClosed()) {
+ consumer->incrRefCount();
lock.unlock();
LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
callback(ResultOk, Consumer(consumer));
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 980d9f3..d9fedaf 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -110,6 +110,10 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()
return consumerCreatedPromise_.getFuture();
}
+void ConsumerImpl::incrRefCount() { ++refCount_; }
+
+unsigned int ConsumerImpl::safeDecrRefCount() { return refCount_ > 0 ? refCount_-- : refCount_; }
+
const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }
const std::string& ConsumerImpl::getTopic() const { return topic_; }
@@ -726,10 +730,10 @@ inline proto::CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition(
InitialPosition initialPosition = config_.getSubscriptionInitialPosition();
switch (initialPosition) {
case InitialPositionLatest:
- return proto::CommandSubscribe_InitialPosition ::CommandSubscribe_InitialPosition_Latest;
+ return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Latest;
case InitialPositionEarliest:
- return proto::CommandSubscribe_InitialPosition ::CommandSubscribe_InitialPosition_Earliest;
+ return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Earliest;
}
}
@@ -817,6 +821,14 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
+ if (safeDecrRefCount() != 0) {
+ lock.unlock();
+ if (callback) {
+ callback(ResultOk);
+ }
+ return;
+ }
+
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
lock.unlock();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index b917f790..16ce091 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -113,6 +113,7 @@ class ConsumerImpl : public ConsumerImplBase,
virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
+ virtual void incrRefCount();
protected:
void connectionOpened(const ClientConnectionPtr& cnx);
@@ -146,6 +147,7 @@ class ConsumerImpl : public ConsumerImplBase,
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
+ unsigned int safeDecrRefCount();
Optional<MessageId> clearReceiveQueue();
@@ -176,6 +178,7 @@ class ConsumerImpl : public ConsumerImplBase,
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
+ unsigned int refCount_ = 0;
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index c716270..78b5e12 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -53,6 +53,7 @@ class ConsumerImplBase {
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
+ virtual void incrRefCount(){};
};
} // namespace pulsar
#endif // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 1ea31b7..cd5d343 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2818,6 +2818,8 @@ TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
// Since this is a shared consumer over same client cnx
// closing consumerA should result in consumerB also being closed.
ASSERT_EQ(ResultOk, consumerA.close());
+ ASSERT_EQ(ResultOk, consumerB.close());
+ ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
}
@@ -2936,6 +2938,8 @@ TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
ASSERT_EQ(ResultOk, resultB);
ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
ASSERT_EQ(ResultOk, consumerA.close());
+ ASSERT_EQ(ResultOk, consumerB.close());
+ ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
// consumer C should be a different instance from A and B and should be with open state.