You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:16 UTC
[pulsar] 07/25: [CPP] Fix segment crashes that caused by race
condition of timer in cpp client (#7572)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bcb90d2751203301e89a5adbf2a29d71a9073cfa
Author: Isaiah Rairdon <ir...@gmail.com>
AuthorDate: Tue Jul 28 08:22:08 2020 -0600
[CPP] Fix segment crashes that caused by race condition of timer in cpp client (#7572)
* Kevin Wilson changes to fix segment crashes in pulsar
* after merge with master, change following comments: ptr, unnecessary change
* add lock to avoid concurrent access
* Remove comments
Co-authored-by: Isaiah Rairdon <is...@microfocus.com>
Co-authored-by: Jia Zhai <zh...@apache.org>
Co-authored-by: Sijie Guo <si...@apache.org>
Co-authored-by: xiaolong.ran <rx...@apache.org>
(cherry picked from commit 15d5254e49b96719638f9efec391a6beeed00bb9)
---
pulsar-client-cpp/lib/ClientConnection.cc | 40 +++++++++++++++++++++----------
1 file changed, 28 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3628dd5..ebb7268 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -256,10 +256,14 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
if (serverProtocolVersion_ >= v1) {
// Only send keep-alive probes if the broker supports it
- DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer();
- keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
- keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
- keepAliveTimer_ = keepAliveTimer;
+ keepAliveTimer_ = executor_->createDeadlineTimer();
+ Lock lock(mutex_);
+ if (keepAliveTimer_) {
+ keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+ keepAliveTimer_->async_wait(
+ std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+ }
+ lock.unlock();
}
if (serverProtocolVersion_ >= v8) {
@@ -289,13 +293,14 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
consumerStatsRequests.push_back(it->first);
}
- DeadlineTimerPtr timer = consumerStatsRequestTimer_;
- if (timer) {
- timer->expires_from_now(operationsTimeout_);
- timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout, shared_from_this(),
- std::placeholders::_1, consumerStatsRequests));
+ // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
+ // Check if we have a timer still before we set the request timer to pop again.
+ if (consumerStatsRequestTimer_) {
+ consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
+ consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
+ shared_from_this(), std::placeholders::_1,
+ consumerStatsRequests));
}
-
lock.unlock();
// Complex logic since promises need to be fulfilled outside the lock
for (int i = 0; i < consumerStatsPromises.size(); i++) {
@@ -1344,8 +1349,15 @@ void ClientConnection::handleKeepAliveTimeout() {
havePendingPingRequest_ = true;
sendCommand(Commands::newPing());
- keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
- keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+ // If the close operation has already called the keepAliveTimer_.reset() then the use_count will be
+ // zero And we do not attempt to dereference the pointer.
+ Lock lock(mutex_);
+ if (keepAliveTimer_) {
+ keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+ keepAliveTimer_->async_wait(
+ std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+ }
+ lock.unlock();
}
}
@@ -1375,13 +1387,17 @@ void ClientConnection::close() {
LOG_INFO(cnxString_ << "Connection closed");
if (keepAliveTimer_) {
+ lock.lock();
keepAliveTimer_->cancel();
keepAliveTimer_.reset();
+ lock.unlock();
}
if (consumerStatsRequestTimer_) {
+ lock.lock();
consumerStatsRequestTimer_->cancel();
consumerStatsRequestTimer_.reset();
+ lock.unlock();
}
for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {