You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/16 12:56:45 UTC
[pulsar] 01/02: [issue 7851][C++] Make clear() thread-safe (#7862)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9d024040499805a0536d745b2f9ddce3da9ca983
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Aug 24 07:39:32 2020 +0800
[issue 7851][C++] Make clear() thread-safe (#7862)
Fixes #7851
### Motivation
`clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe.
### Modifications
Acquire a mutex in these `clear()` methods.
(cherry picked from commit 97f41120b9691474f0038b220f3204fa69e48257)
---
pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc | 1 +
pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc | 6 +++++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 2 ++
3 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
index df15119..3d6d920 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
@@ -33,6 +33,7 @@ BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic
}
void BatchAcknowledgementTracker::clear() {
+ Lock lock(mutex_);
trackerMap_.clear();
sendList_.clear();
}
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 9185dba..e280dba 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -39,7 +39,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
}
void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
- std::lock_guard<std::mutex> acquire(lock_);
+ std::unique_lock<std::mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());
@@ -60,6 +60,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
timePartitions.push_back(headPartition);
if (msgIdsToRedeliver.size() > 0) {
+ // redeliverUnacknowledgedMessages() may call clear() that acquire the lock again, so we should unlock
+ // here to avoid deadlock
+ acquire.unlock();
consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
}
}
@@ -148,6 +151,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
}
void UnAckedMessageTrackerEnabled::clear() {
+ std::lock_guard<std::mutex> acquire(lock_);
messageIdPartitionMap.clear();
for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
it->clear();
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index cc71a35..acad16a 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1748,6 +1748,8 @@ TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
timeWaited += 500;
}
+
+ client.close();
}
TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {