You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/16 12:56:45 UTC

[pulsar] 01/02: [issue 7851][C++] Make clear() thread-safe (#7862)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9d024040499805a0536d745b2f9ddce3da9ca983
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Aug 24 07:39:32 2020 +0800

    [issue 7851][C++] Make clear() thread-safe (#7862)
    
    Fixes #7851
    
    ### Motivation
    
    `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe.
    
    ### Modifications
    
    Acquire a mutex in these `clear()` methods.
    (cherry picked from commit 97f41120b9691474f0038b220f3204fa69e48257)
---
 pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc  | 1 +
 pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc | 6 +++++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc          | 2 ++
 3 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
index df15119..3d6d920 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
@@ -33,6 +33,7 @@ BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic
 }
 
 void BatchAcknowledgementTracker::clear() {
+    Lock lock(mutex_);
     trackerMap_.clear();
     sendList_.clear();
 }
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 9185dba..e280dba 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -39,7 +39,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
 }
 
 void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
-    std::lock_guard<std::mutex> acquire(lock_);
+    std::unique_lock<std::mutex> acquire(lock_);
     LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
               << consumerReference_.getName().c_str());
 
@@ -60,6 +60,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
     timePartitions.push_back(headPartition);
 
     if (msgIdsToRedeliver.size() > 0) {
+        // redeliverUnacknowledgedMessages() may call clear() that acquire the lock again, so we should unlock
+        // here to avoid deadlock
+        acquire.unlock();
         consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
     }
 }
@@ -148,6 +151,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
 }
 
 void UnAckedMessageTrackerEnabled::clear() {
+    std::lock_guard<std::mutex> acquire(lock_);
     messageIdPartitionMap.clear();
     for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
         it->clear();
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index cc71a35..acad16a 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1748,6 +1748,8 @@ TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
         std::this_thread::sleep_for(std::chrono::milliseconds(500));
         timeWaited += 500;
     }
+
+    client.close();
 }
 
 TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {