You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/25 02:29:22 UTC
[pulsar] 02/02: [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 357dd4503caef1b96c428d8787f8026cb6b5c042
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Aug 25 10:20:51 2022 +0800
[fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)
---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 97 +++++++++++-----------
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 2 +-
pulsar-client-cpp/lib/SynchronizedHashMap.h | 11 +++
pulsar-client-cpp/tests/ClientTest.cc | 11 +++
pulsar-client-cpp/tests/SynchronizedHashMapTest.cc | 9 +-
5 files changed, 79 insertions(+), 51 deletions(-)
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 58b55e35f95..ad30d5cca50 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -82,16 +82,17 @@ void MultiTopicsConsumerImpl::start() {
void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate) {
- (*topicsNeedCreate)--;
-
if (result != ResultOk) {
state_ = Failed;
+ // Use the first failed result
+ auto expectedResult = ResultOk;
+ failedResult.compare_exchange_strong(expectedResult, result);
LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
+ } else {
+ LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
}
- LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
-
- if (topicsNeedCreate->load() == 0) {
+ if (--(*topicsNeedCreate) == 0) {
MultiTopicsConsumerState state = Pending;
if (state_.compare_exchange_strong(state, Ready)) {
LOG_INFO("Successfully Subscribed to Topics");
@@ -99,11 +100,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
} else {
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
// unsubscribed all of the successfully subscribed partitioned consumers
- closeAsync(nullptr);
- multiTopicsConsumerCreatedPromise_.setFailed(result);
- return;
+ // It's safe to capture only this here, because the callback can be called only when this is valid
+ closeAsync(
+ [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
}
- return;
}
}
@@ -364,13 +364,47 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
state_ = Closing;
- auto self = shared_from_this();
+ std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
int numConsumers = 0;
- consumers_.forEach(
- [&numConsumers, &self, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+ consumers_.clear(
+ [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
numConsumers++;
- consumer->closeAsync([self, name, callback](Result result) {
- self->handleSingleConsumerClose(result, name, callback);
+ consumer->closeAsync([this, weakSelf, name, callback](Result result) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ LOG_DEBUG("Closing the consumer for partition - " << name << " numberTopicPartitions_ - "
+ << numberTopicPartitions_->load());
+ const int numConsumersLeft = --*numberTopicPartitions_;
+ if (numConsumersLeft < 0) {
+ LOG_ERROR("[" << name << "] Unexpected number of left consumers: " << numConsumersLeft
+ << " during close");
+ return;
+ }
+ if (result != ResultOk) {
+ state_ = Failed;
+ LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - "
+ << result);
+ }
+ // closed all consumers
+ if (numConsumersLeft == 0) {
+ messages_.clear();
+ topicsPartitions_.clear();
+ unAckedMessageTrackerPtr_->clear();
+
+ if (state_ != Failed) {
+ state_ = Closed;
+ }
+
+ if (callback) {
+ callback(result);
+ }
+ }
});
});
if (numConsumers == 0) {
@@ -387,41 +421,6 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
failPendingReceiveCallback();
}
-void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string topicPartitionName,
- CloseCallback callback) {
- consumers_.remove(topicPartitionName);
-
- LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - "
- << numberTopicPartitions_->load());
-
- assert(numberTopicPartitions_->load() > 0);
- numberTopicPartitions_->fetch_sub(1);
-
- if (result != ResultOk) {
- state_ = Failed;
- LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
- << result);
- }
-
- // closed all consumers
- if (numberTopicPartitions_->load() == 0) {
- messages_.clear();
- consumers_.clear();
- topicsPartitions_.clear();
- unAckedMessageTrackerPtr_->clear();
-
- if (state_ != Failed) {
- state_ = Closed;
- }
-
- multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
- if (callback) {
- callback(result);
- }
- return;
- }
-}
-
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index d2ede4a8770..0f111110c44 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -105,6 +105,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
MessageListener messageListener_;
LookupServicePtr lookupServicePtr_;
std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
+ std::atomic<Result> failedResult{ResultOk};
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
@@ -113,7 +114,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
/* methods */
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
unsigned int partitionIndex);
- void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
void notifyResult(CloseCallback closeCallback);
void messageReceived(Consumer consumer, const Message& msg);
void internalListener(Consumer consumer);
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h
index 3a784675dd1..184ca6a2836 100644
--- a/pulsar-client-cpp/lib/SynchronizedHashMap.h
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -70,6 +70,17 @@ class SynchronizedHashMap {
data_.clear();
}
+ // clear the map and apply `f` on each removed value
+ void clear(std::function<void(const K&, const V&)> f) {
+ Lock lock(mutex_);
+ auto it = data_.begin();
+ while (it != data_.end()) {
+ f(it->first, it->second);
+ auto next = data_.erase(it);
+ it = next;
+ }
+ }
+
OptValue find(const K& key) const {
Lock lock(mutex_);
auto it = data_.find(key);
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 00696b9a5a4..58c889f074a 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -262,6 +262,17 @@ TEST(ClientTest, testWrongListener) {
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
+ Consumer multiTopicsConsumer;
+ ASSERT_EQ(ResultServiceUnitNotReady,
+ client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"},
+ "sub", multiTopicsConsumer));
+
+ ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+ ASSERT_EQ(ResultOk, client.close());
+
+ // Currently Reader can only read a non-partitioned topic in C++ client
+ client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
+
// Currently Reader can only read a non-partitioned topic in C++ client
Reader reader;
ASSERT_EQ(ResultServiceUnitNotReady,
diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
index 62c55c46c8e..8d74a24014a 100644
--- a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
+++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
@@ -40,9 +40,16 @@ inline PairVector sort(PairVector pairs) {
}
TEST(SynchronizedHashMap, testClear) {
- SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}});
+ SyncMapType m({{1, 100}, {2, 200}});
m.clear();
ASSERT_EQ(m.toPairVector(), PairVector{});
+
+ PairVector expectedPairs({{3, 300}, {4, 400}});
+ SyncMapType m2(expectedPairs);
+ PairVector pairs;
+ m2.clear([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); });
+ ASSERT_EQ(m2.toPairVector(), PairVector{});
+ ASSERT_EQ(sort(pairs), expectedPairs);
}
TEST(SynchronizedHashMap, testRemoveAndFind) {