You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/27 05:33:52 UTC

[pulsar] branch master updated: [cpp client] implement reference count for close() (#3863)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee98e8b  [cpp client] implement reference count for close() (#3863)
ee98e8b is described below

commit ee98e8b28ce1397c056757fe73115d21320ed883
Author: Ezequiel Lovelle <ez...@gmail.com>
AuthorDate: Wed Mar 27 02:33:46 2019 -0300

    [cpp client] implement reference count for close() (#3863)
    
    Add reference count feature to keep track of reused instances of a consumer
    instance, for more details please see commit ff4db8d.
    
    *Modifications*
    
      - Add refCount instance variable on ConsumerImpl.
      - Use new safeDecrRefCount() on consumer close() in order to know whether
        effective close call should occur or not.
      - Increment reference count when a previous built consumer instance is being
        used by caller.
    
    *Future considerations*
    
    Thereafter when feature preventing duplicated consumer is made for
    PartitionedConsumer, MultiTopicsConsumer and PatternMultiTopicsConsumer,
    incrRefCount() member could be turned into a pure virtual method.
---
 pulsar-client-cpp/lib/ClientImpl.cc          |  1 +
 pulsar-client-cpp/lib/ConsumerImpl.cc        | 16 ++++++++++++++--
 pulsar-client-cpp/lib/ConsumerImpl.h         |  3 +++
 pulsar-client-cpp/lib/ConsumerImplBase.h     |  1 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc |  4 ++++
 5 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 0a8aaa6..e2c15e0 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -346,6 +346,7 @@ void ClientImpl::subscribeAsync(const std::string& topic, const std::string& con
                 ConsumerImplBasePtr consumer = weakPtr.lock();
                 if (consumer && consumer->getSubscriptionName() == consumerName &&
                     consumer->getTopic() == topic && !consumer->isClosed()) {
+                    consumer->incrRefCount();
                     lock.unlock();
                     LOG_INFO("Reusing existing consumer instance for " << topic << " -- " << consumerName);
                     callback(ResultOk, Consumer(consumer));
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 980d9f3..d9fedaf 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -110,6 +110,10 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()
     return consumerCreatedPromise_.getFuture();
 }
 
+void ConsumerImpl::incrRefCount() { ++refCount_; }
+
+unsigned int ConsumerImpl::safeDecrRefCount() { return refCount_ > 0 ? refCount_-- : refCount_; }
+
 const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }
 
 const std::string& ConsumerImpl::getTopic() const { return topic_; }
@@ -726,10 +730,10 @@ inline proto::CommandSubscribe_InitialPosition ConsumerImpl::getInitialPosition(
     InitialPosition initialPosition = config_.getSubscriptionInitialPosition();
     switch (initialPosition) {
         case InitialPositionLatest:
-            return proto::CommandSubscribe_InitialPosition ::CommandSubscribe_InitialPosition_Latest;
+            return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Latest;
 
         case InitialPositionEarliest:
-            return proto::CommandSubscribe_InitialPosition ::CommandSubscribe_InitialPosition_Earliest;
+            return proto::CommandSubscribe_InitialPosition::CommandSubscribe_InitialPosition_Earliest;
     }
 }
 
@@ -817,6 +821,14 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
+    if (safeDecrRefCount() != 0) {
+        lock.unlock();
+        if (callback) {
+            callback(ResultOk);
+        }
+        return;
+    }
+
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         lock.unlock();
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index b917f790..16ce091 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -113,6 +113,7 @@ class ConsumerImpl : public ConsumerImplBase,
     virtual bool isReadCompacted();
     virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
     virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
+    virtual void incrRefCount();
 
    protected:
     void connectionOpened(const ClientConnectionPtr& cnx);
@@ -146,6 +147,7 @@ class ConsumerImpl : public ConsumerImplBase,
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
     void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
     void failPendingReceiveCallback();
+    unsigned int safeDecrRefCount();
 
     Optional<MessageId> clearReceiveQueue();
 
@@ -176,6 +178,7 @@ class ConsumerImpl : public ConsumerImplBase,
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
     NegativeAcksTracker negativeAcksTracker_;
+    unsigned int refCount_ = 0;
 
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index c716270..78b5e12 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -53,6 +53,7 @@ class ConsumerImplBase {
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
     virtual void negativeAcknowledge(const MessageId& msgId) = 0;
+    virtual void incrRefCount(){};
 };
 }  // namespace pulsar
 #endif  // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 1ea31b7..cd5d343 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2818,6 +2818,8 @@ TEST(BasicEndToEndTest, testPreventDupConsumersOnSharedMode) {
     // Since this is a shared consumer over same client cnx
     // closing consumerA should result in consumerB also being closed.
     ASSERT_EQ(ResultOk, consumerA.close());
+    ASSERT_EQ(ResultOk, consumerB.close());
+    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
     ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
 }
 
@@ -2936,6 +2938,8 @@ TEST(BasicEndToEndTest, testPreventDupConsumersAllowSameSubForDifferentTopics) {
     ASSERT_EQ(ResultOk, resultB);
     ASSERT_EQ(consumerB.getSubscriptionName(), subsName);
     ASSERT_EQ(ResultOk, consumerA.close());
+    ASSERT_EQ(ResultOk, consumerB.close());
+    ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
     ASSERT_EQ(ResultAlreadyClosed, consumerB.close());
 
     // consumer C should be a different instance from A and B and should be with open state.