You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/16 08:43:21 UTC

[pulsar] branch branch-2.8 updated (367fdeb7d85 -> 2c5965ad4d2)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 367fdeb7d85 [C++] Connect timer cancellation does not call timeout callback (#11486)
     new 6e840ed1155 [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)
     new 2c5965ad4d2 [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  75 +++-------
 pulsar-client-cpp/lib/HandlerBase.cc               |  11 +-
 pulsar-client-cpp/lib/HandlerBase.h                |   2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 166 +++++++++------------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   7 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          |   5 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              |  59 +++-----
 pulsar-client-cpp/lib/SynchronizedHashMap.h        |  11 ++
 pulsar-client-cpp/tests/ClientTest.cc              |  11 ++
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc |   9 +-
 10 files changed, 147 insertions(+), 209 deletions(-)


[pulsar] 01/02: [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6e840ed11552e1f50c2d0214a6c90f204c437d6f
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Aug 13 10:36:51 2022 +0800

    [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)
    
    ### Motivation
    
    Now, Use a lot of locks to ensure the atomicity of `state_` in the `ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and `MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to improve performance.
    
    ### Modifications
    
    Use an atomic `state_` instead of the lock to improve performance.
    
    (cherry picked from commit fb0f653eadcf6bf72eb8c8efcc29975da6e21267)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 75 ++++++----------------
 pulsar-client-cpp/lib/HandlerBase.cc               | 11 ++--
 pulsar-client-cpp/lib/HandlerBase.h                |  2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 71 ++++++--------------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  5 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          |  5 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              | 59 ++++++-----------
 7 files changed, 69 insertions(+), 159 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1f4a6b5b988..7c1c89d93f8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -157,10 +157,7 @@ void ConsumerImpl::start() {
 }
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
-    const auto state = state_;
-    lock.unlock();
-    if (state == Closed) {
+    if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
         return;
     }
@@ -198,7 +195,6 @@ void ConsumerImpl::connectionFailed(Result result) {
     ConsumerImplPtr ptr = shared_from_this();
 
     if (consumerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -271,15 +267,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
 void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO(getName() << "Unsubscribing");
 
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultAlreadyClosed);
         LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
                                "then call unsubscribe");
         return;
     }
 
+    Lock lock(mutex_);
+
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_);
@@ -300,7 +296,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Unsubscribed successfully");
     } else {
@@ -630,12 +625,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -653,12 +646,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 }
 
 Result ConsumerImpl::receiveHelper(Message& msg) {
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
+
     if (messageListener_) {
         LOG_ERROR(getName() << "Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
@@ -685,11 +676,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
@@ -864,13 +852,10 @@ void ConsumerImpl::disconnectConsumer() {
 }
 
 void ConsumerImpl::closeAsync(ResultCallback callback) {
-    Lock lock(mutex_);
-
     // Keep a reference to ensure object is kept alive
     ConsumerImplPtr ptr = shared_from_this();
 
     if (state_ != Ready) {
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -888,7 +873,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
         // If connection is gone, also the consumer is closed on the broker side
         if (callback) {
             callback(ResultOk);
@@ -899,7 +883,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -907,8 +890,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    // Lock is no longer required
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
@@ -924,9 +905,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
-        lock.unlock();
 
         ClientConnectionPtr cnx = getCnx().lock();
         if (cnx) {
@@ -946,22 +925,14 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
 
 void ConsumerImpl::shutdown() {
-    Lock lock(mutex_);
     state_ = Closed;
-    lock.unlock();
 
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
 }
 
-bool ConsumerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool ConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool ConsumerImpl::isOpen() { return state_ == Ready; }
 
 Result ConsumerImpl::pauseMessageListener() {
     if (!messageListener_) {
@@ -1024,14 +995,13 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
 int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
 
 void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         LOG_ERROR(getName() << "Client connection is not open, please try again later.")
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
 
+    Lock lock(mutex_);
     if (brokerConsumerStats_.isValid()) {
         LOG_DEBUG(getName() << "Serving data from cache");
         BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
@@ -1091,16 +1061,14 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     this->ackGroupingTrackerPtr_->flushAndClean();
     ClientConnectionPtr cnx = getCnx().lock();
@@ -1124,16 +1092,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1197,16 +1163,14 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 }
 
 void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed, MessageId());
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1252,10 +1216,7 @@ void ConsumerImpl::trackMessage(const Message& msg) {
     }
 }
 
-bool ConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
 
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index d7025ad004b..5d2244f7552 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
 HandlerBase::~HandlerBase() { timer_->cancel(); }
 
 void HandlerBase::start() {
-    Lock lock(mutex_);
     // guard against concurrent state changes such as closing
-    if (state_ == NotStarted) {
-        state_ = Pending;
-        lock.unlock();
-
+    State state = NotStarted;
+    if (state_.compare_exchange_strong(state, Pending)) {
         grabCnx();
     }
 }
@@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
         return;
     }
 
-    Lock lock(handler->mutex_);
     State state = handler->state_;
 
     ClientConnectionPtr currentConnection = handler->connection_.lock();
@@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) {
 }
 
 void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
-    if (handler->state_ == Pending || handler->state_ == Ready) {
+    const auto state = handler->state_.load();
+    if (state == Pending || state == Ready) {
         TimeDuration delay = handler->backoff_.next();
 
         LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0)
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index eeb8ebe1c5e..1184746da21 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -105,7 +105,7 @@ class HandlerBase {
         Failed
     };
 
-    State state_;
+    std::atomic<State> state_;
     Backoff backoff_;
     uint64_t epoch_;
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 87166d0d360..d4b48681039 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -63,7 +63,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
 
 void MultiTopicsConsumerImpl::start() {
     if (topics_.empty()) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
             return;
@@ -91,14 +92,15 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     (*topicsNeedCreate)--;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
     }
 
     LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
 
     if (topicsNeedCreate->load() == 0) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
         } else {
@@ -122,7 +124,8 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
         return topicPromise->getFuture();
     }
 
-    if (state_ == Closed || state_ == Closing) {
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
         topicPromise->setFailed(ResultAlreadyClosed);
         return topicPromise->getFuture();
@@ -238,15 +241,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
 void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
 
-    Lock lock(mutex_);
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_INFO(consumerStr_ << " already closed");
-        lock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
     state_ = Closing;
-    lock.unlock();
 
     std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
     auto self = shared_from_this();
@@ -270,7 +271,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
                   << result << " subscription - " << subscriptionName_);
     }
@@ -282,7 +283,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
         unAckedMessageTrackerPtr_->clear();
 
         Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
-        setState(Closed);
+        state_ = Closed;
         callback(result1);
         return;
     }
@@ -301,7 +302,8 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
     int numberPartitions = it->second;
     lock.unlock();
 
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - "
                                                                            << subscriptionName_);
         callback(ResultAlreadyClosed);
@@ -337,7 +339,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
                   << result << " topicPartitionName - " << topicPartitionName);
     }
@@ -369,7 +371,8 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 }
 
 void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed "
                   << " topic" << topic_ << " consumer - " << consumerStr_);
         if (callback) {
@@ -378,7 +381,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    setState(Closing);
+    state_ = Closing;
 
     auto self = shared_from_this();
     int numConsumers = 0;
@@ -392,7 +395,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
     if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << subscriptionName_);
-        setState(Closed);
+        state_ = Closed;
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -414,7 +417,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri
     numberTopicPartitions_->fetch_sub(1);
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
                                                                  << result);
     }
@@ -476,18 +479,14 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
-    lock.unlock();
     messages_.pop(msg);
 
     unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -495,19 +494,15 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
 
-    lock.unlock();
     if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
@@ -520,12 +515,10 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -590,30 +583,11 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
 
-void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
-    Lock lock(mutex_);
-    state_ = state;
-}
-
-bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect,
-                                                 MultiTopicsConsumerState update) {
-    Lock lock(mutex_);
-    if (state_ == expect) {
-        state_ = update;
-        return true;
-    } else {
-        return false;
-    }
-}
-
 void MultiTopicsConsumerImpl::shutdown() {}
 
 bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool MultiTopicsConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; }
 
 void MultiTopicsConsumerImpl::receiveMessages() {
     const auto receiverQueueSize = conf_.getReceiverQueueSize();
@@ -663,12 +637,11 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
 
 void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
+    Lock lock(mutex_);
     MultiTopicsBrokerConsumerStatsPtr statsPtr =
         std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
@@ -742,11 +715,9 @@ void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool MultiTopicsConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         return false;
     }
-    lock.unlock();
 
     return consumers_
         .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index f69c56343e4..bfd92582c75 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -106,7 +106,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    MultiTopicsConsumerState state_ = Pending;
+    std::atomic<MultiTopicsConsumerState> state_{Pending};
     BlockingQueue<Message> messages_;
     const ExecutorServicePtr listenerExecutor_;
     MessageListener messageListener_;
@@ -120,9 +120,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::queue<ReceiveCallback> pendingReceives_;
 
     /* methods */
-    void setState(MultiTopicsConsumerState state);
-    bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update);
-
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
     void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 8e92fd318d6..1183adfc4f1 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -55,8 +55,9 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
         return;
     }
 
-    if (state_ != Ready) {
-        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_);
+    const auto state = state_.load();
+    if (state != Ready) {
+        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state);
         resetAutoDiscoveryTimer();
         return;
     }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index e15d388ef64..a446c2b4b25 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -136,13 +136,10 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
 }
 
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
     if (state_ == Closed) {
-        lock.unlock();
         LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
         return;
     }
-    lock.unlock();
 
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
@@ -164,7 +161,6 @@ void ProducerImpl::connectionFailed(Result result) {
         // so don't change the state and allow reconnections
         return;
     } else if (producerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -175,14 +171,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
 
     // make sure we're still in the Pending/Ready state, closeAsync could have been invoked
     // while waiting for this response if using lazy producers
-    Lock lock(mutex_);
-    if (state_ != Ready && state_ != Pending) {
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already closed");
         failPendingMessages(ResultAlreadyClosed, false);
         return;
     }
 
     if (result == ResultOk) {
+        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
@@ -217,8 +214,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
-        lock.unlock();
-
         // Producer creation failed
         if (result == ResultTimeout) {
             // Creating the producer has timed out. We need to ensure the broker closes the producer
@@ -248,7 +243,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                 LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
                 failPendingMessages(result, true);
                 producerCreatedPromise_.setFailed(result);
-                Lock lock(mutex_);
                 state_ = Failed;
             }
         }
@@ -334,9 +328,8 @@ void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, Send
 
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
-
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend(callback);
             lock.unlock();
             failures.complete();
@@ -350,8 +343,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
 
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend();
             lock.unlock();
             failures.complete();
@@ -551,8 +544,9 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
 
     // ignore if the producer is already closing/closed
-    Lock lock(mutex_);
-    if (state_ == Pending || state_ == Ready) {
+    const auto state = state_.load();
+    if (state == Pending || state == Ready) {
+        Lock lock(mutex_);
         auto failures = batchMessageAndSend();
         lock.unlock();
         failures.complete();
@@ -569,11 +563,9 @@ void ProducerImpl::printStats() {
 }
 
 void ProducerImpl::closeAsync(CloseCallback callback) {
-    Lock lock(mutex_);
-
     // if the producer was never started then there is nothing to clean up
-    if (state_ == NotStarted) {
-        state_ = Closed;
+    State expectedState = NotStarted;
+    if (state_.compare_exchange_strong(expectedState, Closed)) {
         callback(ResultOk);
         return;
     }
@@ -586,9 +578,11 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // ensure any remaining send callbacks are called before calling the close callback
     failPendingMessages(ResultAlreadyClosed, false);
 
-    if (state_ != Ready && state_ != Pending) {
+    // TODO  maybe we need a loop here to implement CAS for a condition,
+    // just like Java's `getAndUpdate` method on an atomic variable
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         state_ = Closed;
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -601,7 +595,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
+
         if (callback) {
             callback(ResultOk);
         }
@@ -615,7 +609,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -623,7 +616,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
         return;
     }
 
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
@@ -636,7 +628,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
 void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Closed producer");
         ClientConnectionPtr cnx = getCnx().lock();
@@ -659,10 +650,11 @@ Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture()
 uint64_t ProducerImpl::getProducerId() const { return producerId_; }
 
 void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
-    Lock lock(mutex_);
-    if (state_ != Pending && state_ != Ready) {
+    const auto state = state_.load();
+    if (state != Pending && state != Ready) {
         return;
     }
+    Lock lock(mutex_);
 
     if (err == boost::asio::error::operation_aborted) {
         LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
@@ -837,22 +829,13 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr
     return a->getProducerId() < b->getProducerId();
 }
 
-bool ProducerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ProducerImpl::isClosed() { return state_ == Closed; }
 
-bool ProducerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
 
 uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }
 
-bool ProducerImpl::isStarted() const {
-    Lock lock(mutex_);
-    return state_ != NotStarted;
-}
+bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
 void ProducerImpl::startSendTimeoutTimer() {
     // Initialize the sendTimer only once per producer and only when producer timeout is
     // configured. Set the timeout as configured value and asynchronously wait for the


[pulsar] 02/02: [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2c5965ad4d2eb10b63a293d545e88264878a7a4f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Aug 25 10:20:51 2022 +0800

    [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)
    
    (cherry picked from commit 40d2ae3ac7ea11c8200ea104efcc3c587425b800)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 97 +++++++++++-----------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  2 +-
 pulsar-client-cpp/lib/SynchronizedHashMap.h        | 11 +++
 pulsar-client-cpp/tests/ClientTest.cc              | 11 +++
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc |  9 +-
 5 files changed, 79 insertions(+), 51 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index d4b48681039..5fe9446b186 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -89,16 +89,17 @@ void MultiTopicsConsumerImpl::start() {
 void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
                                                        const std::string& topic,
                                                        std::shared_ptr<std::atomic<int>> topicsNeedCreate) {
-    (*topicsNeedCreate)--;
-
     if (result != ResultOk) {
         state_ = Failed;
+        // Use the first failed result
+        auto expectedResult = ResultOk;
+        failedResult.compare_exchange_strong(expectedResult, result);
         LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
+    } else {
+        LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
     }
 
-    LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
-
-    if (topicsNeedCreate->load() == 0) {
+    if (--(*topicsNeedCreate) == 0) {
         MultiTopicsConsumerState state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
@@ -106,11 +107,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
-            closeAsync(nullptr);
-            multiTopicsConsumerCreatedPromise_.setFailed(result);
-            return;
+            // It's safe to capture only this here, because the callback can be called only when this is valid
+            closeAsync(
+                [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
         }
-        return;
     }
 }
 
@@ -383,13 +383,47 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     state_ = Closing;
 
-    auto self = shared_from_this();
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
     int numConsumers = 0;
-    consumers_.forEach(
-        [&numConsumers, &self, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+    consumers_.clear(
+        [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
             numConsumers++;
-            consumer->closeAsync([self, name, callback](Result result) {
-                self->handleSingleConsumerClose(result, name, callback);
+            consumer->closeAsync([this, weakSelf, name, callback](Result result) {
+                auto self = weakSelf.lock();
+                if (!self) {
+                    return;
+                }
+                LOG_DEBUG("Closing the consumer for partition - " << name << " numberTopicPartitions_ - "
+                                                                  << numberTopicPartitions_->load());
+                const int numConsumersLeft = --*numberTopicPartitions_;
+                if (numConsumersLeft < 0) {
+                    LOG_ERROR("[" << name << "] Unexpected number of left consumers: " << numConsumersLeft
+                                  << " during close");
+                    return;
+                }
+                if (result != ResultOk) {
+                    state_ = Failed;
+                    LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - "
+                                                                             << result);
+                }
+                // closed all consumers
+                if (numConsumersLeft == 0) {
+                    messages_.clear();
+                    topicsPartitions_.clear();
+                    unAckedMessageTrackerPtr_->clear();
+
+                    if (state_ != Failed) {
+                        state_ = Closed;
+                    }
+
+                    if (callback) {
+                        callback(result);
+                    }
+                }
             });
         });
     if (numConsumers == 0) {
@@ -406,41 +440,6 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
     failPendingReceiveCallback();
 }
 
-void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string topicPartitionName,
-                                                        CloseCallback callback) {
-    consumers_.remove(topicPartitionName);
-
-    LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - "
-                                                      << numberTopicPartitions_->load());
-
-    assert(numberTopicPartitions_->load() > 0);
-    numberTopicPartitions_->fetch_sub(1);
-
-    if (result != ResultOk) {
-        state_ = Failed;
-        LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
-                                                                 << result);
-    }
-
-    // closed all consumers
-    if (numberTopicPartitions_->load() == 0) {
-        messages_.clear();
-        consumers_.clear();
-        topicsPartitions_.clear();
-        unAckedMessageTrackerPtr_->clear();
-
-        if (state_ != Failed) {
-            state_ = Closed;
-        }
-
-        multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
-        if (callback) {
-            callback(result);
-        }
-        return;
-    }
-}
-
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
     LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
                                                           << " message:" << msg.getDataAsString());
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index bfd92582c75..95c24f68c5b 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -114,6 +114,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     boost::posix_time::time_duration partitionsUpdateInterval_;
     LookupServicePtr lookupServicePtr_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
+    std::atomic<Result> failedResult{ResultOk};
     Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     const std::vector<std::string> topics_;
@@ -122,7 +123,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     /* methods */
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
-    void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
     void notifyResult(CloseCallback closeCallback);
     void messageReceived(Consumer consumer, const Message& msg);
     void internalListener(Consumer consumer);
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h
index 3a784675dd1..184ca6a2836 100644
--- a/pulsar-client-cpp/lib/SynchronizedHashMap.h
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -70,6 +70,17 @@ class SynchronizedHashMap {
         data_.clear();
     }
 
+    // clear the map and apply `f` on each removed value
+    void clear(std::function<void(const K&, const V&)> f) {
+        Lock lock(mutex_);
+        auto it = data_.begin();
+        while (it != data_.end()) {
+            f(it->first, it->second);
+            auto next = data_.erase(it);
+            it = next;
+        }
+    }
+
     OptValue find(const K& key) const {
         Lock lock(mutex_);
         auto it = data_.find(key);
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 14c3af70aa3..2bc66c6da2b 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -249,6 +249,17 @@ TEST(ClientTest, testWrongListener) {
 
     client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
 
+    Consumer multiTopicsConsumer;
+    ASSERT_EQ(ResultServiceUnitNotReady,
+              client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"},
+                               "sub", multiTopicsConsumer));
+
+    ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
+
+    // Currently Reader can only read a non-partitioned topic in C++ client
+    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
+
     // Currently Reader can only read a non-partitioned topic in C++ client
     Reader reader;
     ASSERT_EQ(ResultServiceUnitNotReady,
diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
index 62c55c46c8e..8d74a24014a 100644
--- a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
+++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
@@ -40,9 +40,16 @@ inline PairVector sort(PairVector pairs) {
 }
 
 TEST(SynchronizedHashMap, testClear) {
-    SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}});
+    SyncMapType m({{1, 100}, {2, 200}});
     m.clear();
     ASSERT_EQ(m.toPairVector(), PairVector{});
+
+    PairVector expectedPairs({{3, 300}, {4, 400}});
+    SyncMapType m2(expectedPairs);
+    PairVector pairs;
+    m2.clear([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); });
+    ASSERT_EQ(m2.toPairVector(), PairVector{});
+    ASSERT_EQ(sort(pairs), expectedPairs);
 }
 
 TEST(SynchronizedHashMap, testRemoveAndFind) {