You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/03/23 07:27:35 UTC

[pulsar] branch branch-2.10 updated: [C++] Fix producer is never destructed until client is closed (#14797)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new d8e843b  [C++] Fix producer is never destructed until client is closed (#14797)
d8e843b is described below

commit d8e843bf92d9a768a61622fa131ecd48335ad91d
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Mar 23 14:35:38 2022 +0800

    [C++] Fix producer is never destructed until client is closed (#14797)
    
    Fixes #509
    
    ### Motivation
    
    When a C++ producer is created successfully, it will start a send timer.
    However the callback has captured the shared pointer of `ProducerImpl`
    itself. It extends the lifetime of `ProducerImpl` so that even after the
    `Producer` object destructs, the underlying `ProducerImpl` object won't
    be destructed. It could only be destructed after `Client::close()` is
    called.
    
    ### Modifications
    
    - Pass a weak pointer of `ProducerImpl` to the send timer and add a
      `asyncWaitSendTimeout` method for the combination of
      `expires_from_now` and `async_wait` calls on the timer.
    - Add `ClientTest.testReferenceCount` to verify the reference count will
      become 0 after the producer or consumer destructs.
    
    (cherry picked from commit f7cbc1eb83ffd27b784d90d5d2dea8660c590ad2)
---
 pulsar-client-cpp/lib/ProducerImpl.cc  | 25 ++++++++++++++++---------
 pulsar-client-cpp/lib/ProducerImpl.h   |  2 ++
 pulsar-client-cpp/tests/ClientTest.cc  | 27 +++++++++++++++++++++++++++
 pulsar-client-cpp/tests/PulsarFriend.h |  8 ++++++++
 4 files changed, 53 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 87f8552..bb95584 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -763,8 +763,8 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
     std::shared_ptr<PendingCallbacks> pendingCallbacks;
     if (pendingMessagesQueue_.empty()) {
         // If there are no pending messages, reset the timeout to the configured value.
-        sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
         LOG_DEBUG(getName() << "Producer timeout triggered on empty pending message queue");
+        asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
     } else {
         // If there is at least one message, calculate the diff between the message timeout and
         // the current time.
@@ -774,17 +774,14 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
             LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks.");
             pendingCallbacks = getPendingCallbacksWhenFailed();
             // Since the pending queue is cleared now, set timer to expire after configured value.
-            sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
+            asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
         } else {
             // The diff is greater than zero, set the timeout to the diff value
             LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff);
-            sendTimer_->expires_from_now(diff);
+            asyncWaitSendTimeout(diff);
         }
     }
 
-    // Asynchronously wait for the timeout to trigger
-    sendTimer_->async_wait(
-        std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
     lock.unlock();
     if (pendingCallbacks) {
         pendingCallbacks->complete(ResultTimeout);
@@ -946,11 +943,21 @@ void ProducerImpl::startSendTimeoutTimer() {
     // timeout to happen.
     if (!sendTimer_ && conf_.getSendTimeout() > 0) {
         sendTimer_ = executor_->createDeadlineTimer();
-        sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
-        sendTimer_->async_wait(
-            std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
+        asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
     }
 }
 
+void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
+    sendTimer_->expires_from_now(expiryTime);
+
+    ProducerImplBaseWeakPtr weakSelf = shared_from_this();
+    sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+        auto self = weakSelf.lock();
+        if (self) {
+            std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
+        }
+    });
+}
+
 }  // namespace pulsar
 /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 3dccf91..a8f35a6 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -170,6 +170,8 @@ class ProducerImpl : public HandlerBase,
 
     DeadlineTimerPtr sendTimer_;
     void handleSendTimeout(const boost::system::error_code& err);
+    using DurationType = typename boost::asio::deadline_timer::duration_type;
+    void asyncWaitSendTimeout(DurationType expiryTime);
 
     Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
 
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 6d55217..eb94ba6 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 
 #include "HttpHelper.h"
+#include "PulsarFriend.h"
 
 #include <future>
 #include <pulsar/Client.h>
@@ -189,3 +190,29 @@ TEST(ClientTest, testGetNumberOfReferences) {
 
     client.close();
 }
+
+TEST(ClientTest, testReferenceCount) {
+    Client client(lookupUrl);
+    const std::string topic = "client-test-reference-count-" + std::to_string(time(nullptr));
+
+    auto &producers = PulsarFriend::getProducers(client);
+    auto &consumers = PulsarFriend::getConsumers(client);
+
+    {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+        ASSERT_EQ(producers.size(), 1);
+        ASSERT_EQ(producers[0].use_count(), 1);
+
+        Consumer consumer;
+        ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
+        ASSERT_EQ(consumers.size(), 1);
+        ASSERT_EQ(consumers[0].use_count(), 1);
+    }
+
+    ASSERT_EQ(producers.size(), 1);
+    ASSERT_EQ(producers[0].use_count(), 0);
+    ASSERT_EQ(consumers.size(), 1);
+    ASSERT_EQ(consumers[0].use_count(), 0);
+    client.close();
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index c7aa3a6..b04899c 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -95,6 +95,14 @@ class PulsarFriend {
 
     static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
 
+    static ClientImpl::ProducersList& getProducers(const Client& client) {
+        return getClientImplPtr(client)->producers_;
+    }
+
+    static ClientImpl::ConsumersList& getConsumers(const Client& client) {
+        return getClientImplPtr(client)->consumers_;
+    }
+
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
         consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
     }