You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/05 01:48:36 UTC

[pulsar] 02/04: [Issue 11493] Simple implementation of getting number of references from C++ client (#11535)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f8d4779a0f3fdd9d2e90f1007ed891ae566f488
Author: Jiabei Zhao <41...@users.noreply.github.com>
AuthorDate: Tue Aug 3 22:55:15 2021 +0800

    [Issue 11493] Simple implementation of getting number of references from C++ client (#11535)
    
    Fixes #11493
    
    Master Issue: #11493
    
    ### Motivation
    In Pulsar, we use a single client to create multiple producers/consumers/readers. Is there any method/attribute that can give information on number of producers/readers/consumers connected to the given pulsar client instance at the given point of time?
    
    Say there is a single pulsar client instance. Multiple consumers and readers are created from the given client instance. This client needs to be cleaned up when all the references are closed. In this case, it would be of help, to get information on the number of the consumers/readers/ getPartitionsForTopic calls are active on the given client. Ie, having the number of references for the given client can provide information on whether it is fine to clean up the client instance at the g [...]
    
    *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
    
    ### Modifications
    
    Add these method to `Client.h`, `Client.cc`, `ClientImpl.h`, `CliemtImpl.cc` :
    ```
    uint64_t getNumberOfProducer()
    uint64_t getNumberOfConsumer()
    ```
    To count alive producers, I get each producer by weak point and check if it is connected.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    
    Add unit test to check if these functions can return correct references number. Test file is `ClientTest.cc`. Test function is `TEST(ClientTest, testGetNumberOfReferences)`.
    
    (cherry picked from commit e6909c653c593fd36c62fd4015809f64cbb28cdb)
---
 pulsar-client-cpp/include/pulsar/Client.h        | 14 ++++++
 pulsar-client-cpp/lib/Client.cc                  |  3 ++
 pulsar-client-cpp/lib/ClientImpl.cc              | 24 ++++++++++
 pulsar-client-cpp/lib/ClientImpl.h               |  3 ++
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  2 +
 pulsar-client-cpp/lib/ConsumerImpl.h             |  1 +
 pulsar-client-cpp/lib/ConsumerImplBase.h         |  1 +
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 13 +++++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h  |  2 +
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 13 +++++
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h  |  1 +
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 13 +++++
 pulsar-client-cpp/lib/PartitionedProducerImpl.h  |  2 +-
 pulsar-client-cpp/lib/ProducerImpl.cc            |  2 +
 pulsar-client-cpp/lib/ProducerImpl.h             |  1 +
 pulsar-client-cpp/lib/ProducerImplBase.h         |  1 +
 pulsar-client-cpp/tests/ClientTest.cc            | 61 ++++++++++++++++++++++++
 17 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index 1bc2614..49dd011 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -355,6 +355,20 @@ class PULSAR_PUBLIC Client {
      */
     void shutdown();
 
+    /**
+     * @brief Get the number of alive producers on the current client.
+     *
+     * @return The number of alive producers on the  current client.
+     */
+    uint64_t getNumberOfProducers();
+
+    /**
+     * @brief Get the number of alive consumers on the current client.
+     *
+     * @return The number of alive consumers on the current client.
+     */
+    uint64_t getNumberOfConsumers();
+
    private:
     Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
            bool poolConnections);
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index 59bf8f4..c72232a 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -175,4 +175,7 @@ Result Client::close() {
 void Client::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); }
 
 void Client::shutdown() { impl_->shutdown(); }
+
+uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); }
+uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); }
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index b93ad2d..02099a5 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -581,6 +581,30 @@ uint64_t ClientImpl::newRequestId() {
     return requestIdGenerator_++;
 }
 
+uint64_t ClientImpl::getNumberOfProducers() {
+    Lock lock(mutex_);
+    uint64_t numberOfAliveProducers = 0;
+    for (const auto& producer : producers_) {
+        const auto& producerImpl = producer.lock();
+        if (producerImpl) {
+            numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
+        }
+    }
+    return numberOfAliveProducers;
+}
+
+uint64_t ClientImpl::getNumberOfConsumers() {
+    Lock lock(mutex_);
+    uint64_t numberOfAliveConsumers = 0;
+    for (const auto& consumer : consumers_) {
+        const auto consumerImpl = consumer.lock();
+        if (consumerImpl) {
+            numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
+        }
+    }
+    return numberOfAliveConsumers;
+}
+
 const ClientConfiguration& ClientImpl::getClientConfig() const { return clientConfiguration_; }
 
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 81eb596..847872a 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -83,6 +83,9 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     uint64_t newConsumerId();
     uint64_t newRequestId();
 
+    uint64_t getNumberOfProducers();
+    uint64_t getNumberOfConsumers();
+
     const ClientConfiguration& getClientConfig() const;
 
     const ClientConfiguration& conf() const;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index c268417..fea7d4d 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -1233,4 +1233,6 @@ bool ConsumerImpl::isConnected() const {
     return !getCnx().expired() && state_ == Ready;
 }
 
+uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
+
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 28f96dd..cde8293 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -125,6 +125,7 @@ class ConsumerImpl : public ConsumerImplBase,
     void seekAsync(uint64_t timestamp, ResultCallback callback) override;
     void negativeAcknowledge(const MessageId& msgId) override;
     bool isConnected() const override;
+    uint64_t getNumberOfConnectedConsumer() override;
 
     virtual void disconnectConsumer();
     Result fetchSingleMessageFromBroker(Message& msg);
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index c8f36d0..693d4da 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -56,6 +56,7 @@ class ConsumerImplBase {
     virtual void seekAsync(uint64_t timestamp, ResultCallback callback) = 0;
     virtual void negativeAcknowledge(const MessageId& msgId) = 0;
     virtual bool isConnected() const = 0;
+    virtual uint64_t getNumberOfConnectedConsumer() = 0;
 
    private:
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 4b0d741..64aaada 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -749,3 +749,16 @@ bool MultiTopicsConsumerImpl::isConnected() const {
     }
     return true;
 }
+
+uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
+    Lock lock(mutex_);
+    uint64_t numberOfConnectedConsumer = 0;
+    const auto consumers = consumers_;
+    lock.unlock();
+    for (const auto& topicAndConsumer : consumers) {
+        if (topicAndConsumer.second->isConnected()) {
+            numberOfConnectedConsumer++;
+        }
+    }
+    return numberOfConnectedConsumer;
+}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index df3039b..3a1249b 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -78,6 +78,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void seekAsync(uint64_t timestamp, ResultCallback callback) override;
     void negativeAcknowledge(const MessageId& msgId) override;
     bool isConnected() const override;
+    uint64_t getNumberOfConnectedConsumer() override;
+
     void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
                                 size_t, BrokerConsumerStatsCallback);
     // return first topic name when all topics name valid, or return null pointer
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 9a9c882..7aa506e 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -635,4 +635,17 @@ bool PartitionedConsumerImpl::isConnected() const {
     return true;
 }
 
+uint64_t PartitionedConsumerImpl::getNumberOfConnectedConsumer() {
+    uint64_t numberOfConnectedConsumer = 0;
+    Lock consumersLock(consumersMutex_);
+    const auto consumers = consumers_;
+    consumersLock.unlock();
+    for (const auto& consumer : consumers) {
+        if (consumer->isConnected()) {
+            numberOfConnectedConsumer++;
+        }
+    }
+    return numberOfConnectedConsumer;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 2696288..83ada95 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -74,6 +74,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     void seekAsync(uint64_t timestamp, ResultCallback callback) override;
     void negativeAcknowledge(const MessageId& msgId) override;
     bool isConnected() const override;
+    uint64_t getNumberOfConnectedConsumer() override;
 
     void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, PartitionedBrokerConsumerStatsPtr,
                                 size_t, BrokerConsumerStatsCallback);
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 0df729d..4e01263 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -387,4 +387,17 @@ bool PartitionedProducerImpl::isConnected() const {
     return true;
 }
 
+uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
+    uint64_t numberOfConnectedProducer = 0;
+    Lock producersLock(producersMutex_);
+    const auto producers = producers_;
+    producersLock.unlock();
+    for (const auto& producer : producers) {
+        if (producer->isConnected()) {
+            numberOfConnectedProducer++;
+        }
+    }
+    return numberOfConnectedProducer;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index cd1ee8c..c097190 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -64,7 +64,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     void triggerFlush() override;
     void flushAsync(FlushCallback callback) override;
     bool isConnected() const override;
-
+    uint64_t getNumberOfConnectedProducer() override;
     void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                               const unsigned int partitionIndex);
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index d81e958..c7a4551 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -826,5 +826,7 @@ bool ProducerImpl::isConnected() const {
     return !getCnx().expired() && state_ == Ready;
 }
 
+uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }
+
 }  // namespace pulsar
 /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index caec85c..2c51d41 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -70,6 +70,7 @@ class ProducerImpl : public HandlerBase,
     void triggerFlush() override;
     void flushAsync(FlushCallback callback) override;
     bool isConnected() const override;
+    uint64_t getNumberOfConnectedProducer() override;
 
     bool removeCorruptMessage(uint64_t sequenceId);
 
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h b/pulsar-client-cpp/lib/ProducerImplBase.h
index a947624..15a6e1d 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -45,6 +45,7 @@ class ProducerImplBase {
     virtual void triggerFlush() = 0;
     virtual void flushAsync(FlushCallback callback) = 0;
     virtual bool isConnected() const = 0;
+    virtual uint64_t getNumberOfConnectedProducer() = 0;
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 129a322..10b5b32 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -18,6 +18,8 @@
  */
 #include <gtest/gtest.h>
 
+#include "HttpHelper.h"
+
 #include <future>
 #include <pulsar/Client.h>
 #include "../lib/checksum/ChecksumProvider.h"
@@ -114,3 +116,62 @@ TEST(ClientTest, testConnectTimeout) {
     clientLow.close();
     clientDefault.close();
 }
+
+TEST(ClientTest, testGetNumberOfReferences) {
+    Client client("pulsar://localhost:6650");
+
+    // Producer test
+    uint64_t numberOfProducers = 0;
+    const std::string nonPartitionedTopic =
+        "testGetNumberOfReferencesNonPartitionedTopic" + std::to_string(time(nullptr));
+
+    const std::string partitionedTopic =
+        "testGetNumberOfReferencesPartitionedTopic" + std::to_string(time(nullptr));
+    Producer producer;
+    client.createProducer(nonPartitionedTopic, producer);
+    numberOfProducers = 1;
+    ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());
+
+    producer.close();
+    numberOfProducers = 0;
+    ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());
+
+    // PartitionedProducer
+    int res = makePutRequest(
+        "http://localhost:8080/admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    client.createProducer(partitionedTopic, producer);
+    numberOfProducers = 2;
+    ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());
+    producer.close();
+    numberOfProducers = 0;
+    ASSERT_EQ(numberOfProducers, client.getNumberOfProducers());
+
+    // Consumer test
+    uint64_t numberOfConsumers = 0;
+
+    Consumer consumer1;
+    client.subscribe(nonPartitionedTopic, "consumer-1", consumer1);
+    numberOfConsumers = 1;
+    ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
+
+    consumer1.close();
+    numberOfConsumers = 0;
+    ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
+
+    Consumer consumer2;
+    Consumer consumer3;
+    client.subscribe(partitionedTopic, "consumer-2", consumer2);
+    numberOfConsumers = 2;
+    ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
+    client.subscribe(nonPartitionedTopic, "consumer-3", consumer3);
+    numberOfConsumers = 3;
+    ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
+    consumer2.close();
+    consumer3.close();
+    numberOfConsumers = 0;
+    ASSERT_EQ(numberOfConsumers, client.getNumberOfConsumers());
+
+    client.close();
+}
\ No newline at end of file