You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/25 02:29:22 UTC

[pulsar] 02/02: [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 357dd4503caef1b96c428d8787f8026cb6b5c042
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Aug 25 10:20:51 2022 +0800

    [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 97 +++++++++++-----------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  2 +-
 pulsar-client-cpp/lib/SynchronizedHashMap.h        | 11 +++
 pulsar-client-cpp/tests/ClientTest.cc              | 11 +++
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc |  9 +-
 5 files changed, 79 insertions(+), 51 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 58b55e35f95..ad30d5cca50 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -82,16 +82,17 @@ void MultiTopicsConsumerImpl::start() {
 void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
                                                        const std::string& topic,
                                                        std::shared_ptr<std::atomic<int>> topicsNeedCreate) {
-    (*topicsNeedCreate)--;
-
     if (result != ResultOk) {
         state_ = Failed;
+        // Use the first failed result
+        auto expectedResult = ResultOk;
+        failedResult.compare_exchange_strong(expectedResult, result);
         LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
+    } else {
+        LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
     }
 
-    LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
-
-    if (topicsNeedCreate->load() == 0) {
+    if (--(*topicsNeedCreate) == 0) {
         MultiTopicsConsumerState state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
@@ -99,11 +100,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
-            closeAsync(nullptr);
-            multiTopicsConsumerCreatedPromise_.setFailed(result);
-            return;
+            // It's safe to capture only this here, because the callback can be called only when this is valid
+            closeAsync(
+                [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
         }
-        return;
     }
 }
 
@@ -364,13 +364,47 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     state_ = Closing;
 
-    auto self = shared_from_this();
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
     int numConsumers = 0;
-    consumers_.forEach(
-        [&numConsumers, &self, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+    consumers_.clear(
+        [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
             numConsumers++;
-            consumer->closeAsync([self, name, callback](Result result) {
-                self->handleSingleConsumerClose(result, name, callback);
+            consumer->closeAsync([this, weakSelf, name, callback](Result result) {
+                auto self = weakSelf.lock();
+                if (!self) {
+                    return;
+                }
+                LOG_DEBUG("Closing the consumer for partition - " << name << " numberTopicPartitions_ - "
+                                                                  << numberTopicPartitions_->load());
+                const int numConsumersLeft = --*numberTopicPartitions_;
+                if (numConsumersLeft < 0) {
+                    LOG_ERROR("[" << name << "] Unexpected number of left consumers: " << numConsumersLeft
+                                  << " during close");
+                    return;
+                }
+                if (result != ResultOk) {
+                    state_ = Failed;
+                    LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - "
+                                                                             << result);
+                }
+                // closed all consumers
+                if (numConsumersLeft == 0) {
+                    messages_.clear();
+                    topicsPartitions_.clear();
+                    unAckedMessageTrackerPtr_->clear();
+
+                    if (state_ != Failed) {
+                        state_ = Closed;
+                    }
+
+                    if (callback) {
+                        callback(result);
+                    }
+                }
             });
         });
     if (numConsumers == 0) {
@@ -387,41 +421,6 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
     failPendingReceiveCallback();
 }
 
-void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string topicPartitionName,
-                                                        CloseCallback callback) {
-    consumers_.remove(topicPartitionName);
-
-    LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - "
-                                                      << numberTopicPartitions_->load());
-
-    assert(numberTopicPartitions_->load() > 0);
-    numberTopicPartitions_->fetch_sub(1);
-
-    if (result != ResultOk) {
-        state_ = Failed;
-        LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
-                                                                 << result);
-    }
-
-    // closed all consumers
-    if (numberTopicPartitions_->load() == 0) {
-        messages_.clear();
-        consumers_.clear();
-        topicsPartitions_.clear();
-        unAckedMessageTrackerPtr_->clear();
-
-        if (state_ != Failed) {
-            state_ = Closed;
-        }
-
-        multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
-        if (callback) {
-            callback(result);
-        }
-        return;
-    }
-}
-
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
     LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
                                                           << " message:" << msg.getDataAsString());
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index d2ede4a8770..0f111110c44 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -105,6 +105,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     MessageListener messageListener_;
     LookupServicePtr lookupServicePtr_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
+    std::atomic<Result> failedResult{ResultOk};
     Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     const std::vector<std::string>& topics_;
@@ -113,7 +114,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     /* methods */
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
-    void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
     void notifyResult(CloseCallback closeCallback);
     void messageReceived(Consumer consumer, const Message& msg);
     void internalListener(Consumer consumer);
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h
index 3a784675dd1..184ca6a2836 100644
--- a/pulsar-client-cpp/lib/SynchronizedHashMap.h
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -70,6 +70,17 @@ class SynchronizedHashMap {
         data_.clear();
     }
 
+    // clear the map and apply `f` on each removed value
+    void clear(std::function<void(const K&, const V&)> f) {
+        Lock lock(mutex_);
+        auto it = data_.begin();
+        while (it != data_.end()) {
+            f(it->first, it->second);
+            auto next = data_.erase(it);
+            it = next;
+        }
+    }
+
     OptValue find(const K& key) const {
         Lock lock(mutex_);
         auto it = data_.find(key);
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 00696b9a5a4..58c889f074a 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -262,6 +262,17 @@ TEST(ClientTest, testWrongListener) {
 
     client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
 
+    Consumer multiTopicsConsumer;
+    ASSERT_EQ(ResultServiceUnitNotReady,
+              client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"},
+                               "sub", multiTopicsConsumer));
+
+    ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
+
+    // Currently Reader can only read a non-partitioned topic in C++ client
+    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
+
     // Currently Reader can only read a non-partitioned topic in C++ client
     Reader reader;
     ASSERT_EQ(ResultServiceUnitNotReady,
diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
index 62c55c46c8e..8d74a24014a 100644
--- a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
+++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
@@ -40,9 +40,16 @@ inline PairVector sort(PairVector pairs) {
 }
 
 TEST(SynchronizedHashMap, testClear) {
-    SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}});
+    SyncMapType m({{1, 100}, {2, 200}});
     m.clear();
     ASSERT_EQ(m.toPairVector(), PairVector{});
+
+    PairVector expectedPairs({{3, 300}, {4, 400}});
+    SyncMapType m2(expectedPairs);
+    PairVector pairs;
+    m2.clear([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); });
+    ASSERT_EQ(m2.toPairVector(), PairVector{});
+    ASSERT_EQ(sort(pairs), expectedPairs);
 }
 
 TEST(SynchronizedHashMap, testRemoveAndFind) {