You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 12:40:07 UTC

[pulsar] 03/05: [C++] Use weak ref to ClientConnection for timeout task (#12409)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2c364d90e1293eeb994b6c96cab808fb420fa799
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Oct 18 19:47:57 2021 -0700

    [C++] Use weak ref to ClientConnection for timeout task (#12409)
    
    ### Motivation
    
    Fixes #12408. Using a weak reference in the timeout task for `ClientConnection` to break a circular reference dependency between the connection instance and the task.
    
    (cherry picked from commit 4e43a1dd85809f0242e354aef7a27973820e0dda)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 23 +++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index b9f2209..2128689 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -532,18 +532,25 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
         return;
     }
 
-    auto self = shared_from_this();
-    connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) {
-        if (state_ != Ready) {
-            LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
-                                 << " ms, close the socket");
+    auto self = ClientConnectionWeakPtr(shared_from_this());
+
+    connectTimeoutTask_->setCallback([self](const PeriodicTask::ErrorCode& ec) {
+        ClientConnectionPtr ptr = self.lock();
+        if (!ptr) {
+            // Connection was already destroyed
+            return;
+        }
+
+        if (ptr->state_ != Ready) {
+            LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
+                                      << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
             PeriodicTask::ErrorCode err;
-            socket_->close(err);
+            ptr->socket_->close(err);
             if (err) {
-                LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+                LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
             }
         }
-        connectTimeoutTask_->stop();
+        ptr->connectTimeoutTask_->stop();
     });
 
     LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");