You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 12:40:07 UTC
[pulsar] 03/05: [C++] Use weak ref to ClientConnection for timeout task (#12409)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2c364d90e1293eeb994b6c96cab808fb420fa799
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Oct 18 19:47:57 2021 -0700
[C++] Use weak ref to ClientConnection for timeout task (#12409)
### Motivation
Fixes #12408. Using a weak reference in the timeout task for `ClientConnection` to break a circular reference dependency between the connection instance and the task.
(cherry picked from commit 4e43a1dd85809f0242e354aef7a27973820e0dda)
---
pulsar-client-cpp/lib/ClientConnection.cc | 23 +++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index b9f2209..2128689 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -532,18 +532,25 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
return;
}
- auto self = shared_from_this();
- connectTimeoutTask_->setCallback([this, self](const PeriodicTask::ErrorCode& ec) {
- if (state_ != Ready) {
- LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
- << " ms, close the socket");
+ auto self = ClientConnectionWeakPtr(shared_from_this());
+
+ connectTimeoutTask_->setCallback([self](const PeriodicTask::ErrorCode& ec) {
+ ClientConnectionPtr ptr = self.lock();
+ if (!ptr) {
+ // Connection was already destroyed
+ return;
+ }
+
+ if (ptr->state_ != Ready) {
+ LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
+ << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
PeriodicTask::ErrorCode err;
- socket_->close(err);
+ ptr->socket_->close(err);
if (err) {
- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+ LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
}
}
- connectTimeoutTask_->stop();
+ ptr->connectTimeoutTask_->stop();
});
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");