You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:16 UTC

[pulsar] 07/25: [CPP] Fix segment crashes that caused by race condition of timer in cpp client (#7572)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bcb90d2751203301e89a5adbf2a29d71a9073cfa
Author: Isaiah Rairdon <ir...@gmail.com>
AuthorDate: Tue Jul 28 08:22:08 2020 -0600

    [CPP] Fix segment crashes that caused by race condition of timer in cpp client (#7572)
    
    * Kevin Wilson changes to fix segment crashes in pulsar
    
    * after merge with master, change following comments: ptr, unnecessary change
    
    * add lock to avoid concurrent access
    
    * Remove comments
    
    Co-authored-by: Isaiah Rairdon <is...@microfocus.com>
    Co-authored-by: Jia Zhai <zh...@apache.org>
    Co-authored-by: Sijie Guo <si...@apache.org>
    Co-authored-by: xiaolong.ran <rx...@apache.org>
    (cherry picked from commit 15d5254e49b96719638f9efec391a6beeed00bb9)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 40 +++++++++++++++++++++----------
 1 file changed, 28 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3628dd5..ebb7268 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -256,10 +256,14 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
 
     if (serverProtocolVersion_ >= v1) {
         // Only send keep-alive probes if the broker supports it
-        DeadlineTimerPtr keepAliveTimer = executor_->createDeadlineTimer();
-        keepAliveTimer->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        keepAliveTimer->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
-        keepAliveTimer_ = keepAliveTimer;
+        keepAliveTimer_ = executor_->createDeadlineTimer();
+        Lock lock(mutex_);
+        if (keepAliveTimer_) {
+            keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            keepAliveTimer_->async_wait(
+                std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        }
+        lock.unlock();
     }
 
     if (serverProtocolVersion_ >= v8) {
@@ -289,13 +293,14 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
         consumerStatsRequests.push_back(it->first);
     }
 
-    DeadlineTimerPtr timer = consumerStatsRequestTimer_;
-    if (timer) {
-        timer->expires_from_now(operationsTimeout_);
-        timer->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout, shared_from_this(),
-                                    std::placeholders::_1, consumerStatsRequests));
+    // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
+    // Check if we have a timer still before we set the request timer to pop again.
+    if (consumerStatsRequestTimer_) {
+        consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
+        consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
+                                                         shared_from_this(), std::placeholders::_1,
+                                                         consumerStatsRequests));
     }
-
     lock.unlock();
     // Complex logic since promises need to be fulfilled outside the lock
     for (int i = 0; i < consumerStatsPromises.size(); i++) {
@@ -1344,8 +1349,15 @@ void ClientConnection::handleKeepAliveTimeout() {
         havePendingPingRequest_ = true;
         sendCommand(Commands::newPing());
 
-        keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        // If the close operation has already called the keepAliveTimer_.reset() then the use_count will be
+        // zero And we do not attempt to dereference the pointer.
+        Lock lock(mutex_);
+        if (keepAliveTimer_) {
+            keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
+            keepAliveTimer_->async_wait(
+                std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        }
+        lock.unlock();
     }
 }
 
@@ -1375,13 +1387,17 @@ void ClientConnection::close() {
     LOG_INFO(cnxString_ << "Connection closed");
 
     if (keepAliveTimer_) {
+        lock.lock();
         keepAliveTimer_->cancel();
         keepAliveTimer_.reset();
+        lock.unlock();
     }
 
     if (consumerStatsRequestTimer_) {
+        lock.lock();
         consumerStatsRequestTimer_->cancel();
         consumerStatsRequestTimer_.reset();
+        lock.unlock();
     }
 
     for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {