You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/16 08:43:22 UTC

[pulsar] 01/02: [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6e840ed11552e1f50c2d0214a6c90f204c437d6f
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Aug 13 10:36:51 2022 +0800

    [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)
    
    ### Motivation
    
    Now, Use a lot of locks to ensure the atomicity of `state_` in the `ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and `MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to improve performance.
    
    ### Modifications
    
    Use an atomic `state_` instead of the lock to improve performance.
    
    (cherry picked from commit fb0f653eadcf6bf72eb8c8efcc29975da6e21267)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 75 ++++++----------------
 pulsar-client-cpp/lib/HandlerBase.cc               | 11 ++--
 pulsar-client-cpp/lib/HandlerBase.h                |  2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 71 ++++++--------------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  5 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          |  5 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              | 59 ++++++-----------
 7 files changed, 69 insertions(+), 159 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1f4a6b5b988..7c1c89d93f8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -157,10 +157,7 @@ void ConsumerImpl::start() {
 }
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
-    const auto state = state_;
-    lock.unlock();
-    if (state == Closed) {
+    if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
         return;
     }
@@ -198,7 +195,6 @@ void ConsumerImpl::connectionFailed(Result result) {
     ConsumerImplPtr ptr = shared_from_this();
 
     if (consumerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -271,15 +267,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
 void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO(getName() << "Unsubscribing");
 
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultAlreadyClosed);
         LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
                                "then call unsubscribe");
         return;
     }
 
+    Lock lock(mutex_);
+
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_);
@@ -300,7 +296,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Unsubscribed successfully");
     } else {
@@ -630,12 +625,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -653,12 +646,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 }
 
 Result ConsumerImpl::receiveHelper(Message& msg) {
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
+
     if (messageListener_) {
         LOG_ERROR(getName() << "Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
@@ -685,11 +676,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
@@ -864,13 +852,10 @@ void ConsumerImpl::disconnectConsumer() {
 }
 
 void ConsumerImpl::closeAsync(ResultCallback callback) {
-    Lock lock(mutex_);
-
     // Keep a reference to ensure object is kept alive
     ConsumerImplPtr ptr = shared_from_this();
 
     if (state_ != Ready) {
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -888,7 +873,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
         // If connection is gone, also the consumer is closed on the broker side
         if (callback) {
             callback(ResultOk);
@@ -899,7 +883,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -907,8 +890,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    // Lock is no longer required
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
@@ -924,9 +905,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
-        lock.unlock();
 
         ClientConnectionPtr cnx = getCnx().lock();
         if (cnx) {
@@ -946,22 +925,14 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
 
 void ConsumerImpl::shutdown() {
-    Lock lock(mutex_);
     state_ = Closed;
-    lock.unlock();
 
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
 }
 
-bool ConsumerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool ConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool ConsumerImpl::isOpen() { return state_ == Ready; }
 
 Result ConsumerImpl::pauseMessageListener() {
     if (!messageListener_) {
@@ -1024,14 +995,13 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
 int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
 
 void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         LOG_ERROR(getName() << "Client connection is not open, please try again later.")
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
 
+    Lock lock(mutex_);
     if (brokerConsumerStats_.isValid()) {
         LOG_DEBUG(getName() << "Serving data from cache");
         BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
@@ -1091,16 +1061,14 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     this->ackGroupingTrackerPtr_->flushAndClean();
     ClientConnectionPtr cnx = getCnx().lock();
@@ -1124,16 +1092,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1197,16 +1163,14 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 }
 
 void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed, MessageId());
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1252,10 +1216,7 @@ void ConsumerImpl::trackMessage(const Message& msg) {
     }
 }
 
-bool ConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
 
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index d7025ad004b..5d2244f7552 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
 HandlerBase::~HandlerBase() { timer_->cancel(); }
 
 void HandlerBase::start() {
-    Lock lock(mutex_);
     // guard against concurrent state changes such as closing
-    if (state_ == NotStarted) {
-        state_ = Pending;
-        lock.unlock();
-
+    State state = NotStarted;
+    if (state_.compare_exchange_strong(state, Pending)) {
         grabCnx();
     }
 }
@@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
         return;
     }
 
-    Lock lock(handler->mutex_);
     State state = handler->state_;
 
     ClientConnectionPtr currentConnection = handler->connection_.lock();
@@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) {
 }
 
 void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
-    if (handler->state_ == Pending || handler->state_ == Ready) {
+    const auto state = handler->state_.load();
+    if (state == Pending || state == Ready) {
         TimeDuration delay = handler->backoff_.next();
 
         LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0)
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index eeb8ebe1c5e..1184746da21 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -105,7 +105,7 @@ class HandlerBase {
         Failed
     };
 
-    State state_;
+    std::atomic<State> state_;
     Backoff backoff_;
     uint64_t epoch_;
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 87166d0d360..d4b48681039 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -63,7 +63,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
 
 void MultiTopicsConsumerImpl::start() {
     if (topics_.empty()) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
             return;
@@ -91,14 +92,15 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     (*topicsNeedCreate)--;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
     }
 
     LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
 
     if (topicsNeedCreate->load() == 0) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
         } else {
@@ -122,7 +124,8 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
         return topicPromise->getFuture();
     }
 
-    if (state_ == Closed || state_ == Closing) {
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
         topicPromise->setFailed(ResultAlreadyClosed);
         return topicPromise->getFuture();
@@ -238,15 +241,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
 void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
 
-    Lock lock(mutex_);
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_INFO(consumerStr_ << " already closed");
-        lock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
     state_ = Closing;
-    lock.unlock();
 
     std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
     auto self = shared_from_this();
@@ -270,7 +271,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
                   << result << " subscription - " << subscriptionName_);
     }
@@ -282,7 +283,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
         unAckedMessageTrackerPtr_->clear();
 
         Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
-        setState(Closed);
+        state_ = Closed;
         callback(result1);
         return;
     }
@@ -301,7 +302,8 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
     int numberPartitions = it->second;
     lock.unlock();
 
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - "
                                                                            << subscriptionName_);
         callback(ResultAlreadyClosed);
@@ -337,7 +339,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
                   << result << " topicPartitionName - " << topicPartitionName);
     }
@@ -369,7 +371,8 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 }
 
 void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed "
                   << " topic" << topic_ << " consumer - " << consumerStr_);
         if (callback) {
@@ -378,7 +381,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    setState(Closing);
+    state_ = Closing;
 
     auto self = shared_from_this();
     int numConsumers = 0;
@@ -392,7 +395,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
     if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << subscriptionName_);
-        setState(Closed);
+        state_ = Closed;
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -414,7 +417,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri
     numberTopicPartitions_->fetch_sub(1);
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
                                                                  << result);
     }
@@ -476,18 +479,14 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
-    lock.unlock();
     messages_.pop(msg);
 
     unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -495,19 +494,15 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
 
-    lock.unlock();
     if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
@@ -520,12 +515,10 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -590,30 +583,11 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
 
-void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
-    Lock lock(mutex_);
-    state_ = state;
-}
-
-bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect,
-                                                 MultiTopicsConsumerState update) {
-    Lock lock(mutex_);
-    if (state_ == expect) {
-        state_ = update;
-        return true;
-    } else {
-        return false;
-    }
-}
-
 void MultiTopicsConsumerImpl::shutdown() {}
 
 bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool MultiTopicsConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; }
 
 void MultiTopicsConsumerImpl::receiveMessages() {
     const auto receiverQueueSize = conf_.getReceiverQueueSize();
@@ -663,12 +637,11 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
 
 void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
+    Lock lock(mutex_);
     MultiTopicsBrokerConsumerStatsPtr statsPtr =
         std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
@@ -742,11 +715,9 @@ void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool MultiTopicsConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         return false;
     }
-    lock.unlock();
 
     return consumers_
         .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index f69c56343e4..bfd92582c75 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -106,7 +106,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    MultiTopicsConsumerState state_ = Pending;
+    std::atomic<MultiTopicsConsumerState> state_{Pending};
     BlockingQueue<Message> messages_;
     const ExecutorServicePtr listenerExecutor_;
     MessageListener messageListener_;
@@ -120,9 +120,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::queue<ReceiveCallback> pendingReceives_;
 
     /* methods */
-    void setState(MultiTopicsConsumerState state);
-    bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update);
-
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
     void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 8e92fd318d6..1183adfc4f1 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -55,8 +55,9 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
         return;
     }
 
-    if (state_ != Ready) {
-        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_);
+    const auto state = state_.load();
+    if (state != Ready) {
+        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state);
         resetAutoDiscoveryTimer();
         return;
     }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index e15d388ef64..a446c2b4b25 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -136,13 +136,10 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
 }
 
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
     if (state_ == Closed) {
-        lock.unlock();
         LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
         return;
     }
-    lock.unlock();
 
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
@@ -164,7 +161,6 @@ void ProducerImpl::connectionFailed(Result result) {
         // so don't change the state and allow reconnections
         return;
     } else if (producerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -175,14 +171,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
 
     // make sure we're still in the Pending/Ready state, closeAsync could have been invoked
     // while waiting for this response if using lazy producers
-    Lock lock(mutex_);
-    if (state_ != Ready && state_ != Pending) {
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already closed");
         failPendingMessages(ResultAlreadyClosed, false);
         return;
     }
 
     if (result == ResultOk) {
+        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
@@ -217,8 +214,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
-        lock.unlock();
-
         // Producer creation failed
         if (result == ResultTimeout) {
             // Creating the producer has timed out. We need to ensure the broker closes the producer
@@ -248,7 +243,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                 LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
                 failPendingMessages(result, true);
                 producerCreatedPromise_.setFailed(result);
-                Lock lock(mutex_);
                 state_ = Failed;
             }
         }
@@ -334,9 +328,8 @@ void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, Send
 
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
-
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend(callback);
             lock.unlock();
             failures.complete();
@@ -350,8 +343,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
 
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend();
             lock.unlock();
             failures.complete();
@@ -551,8 +544,9 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
 
     // ignore if the producer is already closing/closed
-    Lock lock(mutex_);
-    if (state_ == Pending || state_ == Ready) {
+    const auto state = state_.load();
+    if (state == Pending || state == Ready) {
+        Lock lock(mutex_);
         auto failures = batchMessageAndSend();
         lock.unlock();
         failures.complete();
@@ -569,11 +563,9 @@ void ProducerImpl::printStats() {
 }
 
 void ProducerImpl::closeAsync(CloseCallback callback) {
-    Lock lock(mutex_);
-
     // if the producer was never started then there is nothing to clean up
-    if (state_ == NotStarted) {
-        state_ = Closed;
+    State expectedState = NotStarted;
+    if (state_.compare_exchange_strong(expectedState, Closed)) {
         callback(ResultOk);
         return;
     }
@@ -586,9 +578,11 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // ensure any remaining send callbacks are called before calling the close callback
     failPendingMessages(ResultAlreadyClosed, false);
 
-    if (state_ != Ready && state_ != Pending) {
+    // TODO  maybe we need a loop here to implement CAS for a condition,
+    // just like Java's `getAndUpdate` method on an atomic variable
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         state_ = Closed;
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -601,7 +595,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
+
         if (callback) {
             callback(ResultOk);
         }
@@ -615,7 +609,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -623,7 +616,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
         return;
     }
 
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
@@ -636,7 +628,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
 void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Closed producer");
         ClientConnectionPtr cnx = getCnx().lock();
@@ -659,10 +650,11 @@ Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture()
 uint64_t ProducerImpl::getProducerId() const { return producerId_; }
 
 void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
-    Lock lock(mutex_);
-    if (state_ != Pending && state_ != Ready) {
+    const auto state = state_.load();
+    if (state != Pending && state != Ready) {
         return;
     }
+    Lock lock(mutex_);
 
     if (err == boost::asio::error::operation_aborted) {
         LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
@@ -837,22 +829,13 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr
     return a->getProducerId() < b->getProducerId();
 }
 
-bool ProducerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ProducerImpl::isClosed() { return state_ == Closed; }
 
-bool ProducerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
 
 uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }
 
-bool ProducerImpl::isStarted() const {
-    Lock lock(mutex_);
-    return state_ != NotStarted;
-}
+bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
 void ProducerImpl::startSendTimeoutTimer() {
     // Initialize the sendTimer only once per producer and only when producer timeout is
     // configured. Set the timeout as configured value and asynchronously wait for the