You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/08/25 14:27:57 UTC

[pulsar] branch branch-2.9 updated (b78c3894bcc -> 5f00aa58a37)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from b78c3894bcc [fix][tool] Using int instead of long in python scriptsc (#17215)
     new f3899d53959 [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510)
     new 66fbdab74b3 [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)
     new 5f00aa58a37 [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pulsar-client-cpp/lib/ClientImpl.cc                |   4 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  75 +++-------
 pulsar-client-cpp/lib/HandlerBase.cc               |  11 +-
 pulsar-client-cpp/lib/HandlerBase.h                |   2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 166 +++++++++------------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   7 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  77 +++-------
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |   3 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          |   5 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              |  59 +++-----
 pulsar-client-cpp/lib/SynchronizedHashMap.h        |  11 ++
 pulsar-client-cpp/tests/ClientTest.cc              |  19 ++-
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc |   9 +-
 13 files changed, 175 insertions(+), 273 deletions(-)


[pulsar] 01/03: [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3899d53959971b56ddce8e4809b6c759bc1d061
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jul 13 09:16:06 2022 +0800

    [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510)
    
    Fixes #16509
    
    ### Motivation
    
    Since all producers and consumer of the same Client share the same
    connection pool, if one failed, the following producer or consumer might
    reuse the broken connection and failed with `ResultConnectError`. It
    causes the flaky `testWrongListener` because the consumer and reader
    would be created after the producer creation failed with
    `ResultServiceNotReady`.
    
    ### Modifications
    
    Recreate the `Client` for the subsequent creation of `Consumer` and
    `Reader` so that new connection pools will be used for them.
    
    There is also a potential bug that makes `Client::shutdown` wait for the
    max timeout (3 seconds), this PR also applies the timeout for other
    executors.
    
    ### Verifying this change
    
    After this change, I've run the following command in my local env and it
    never failed.
    
    ```bash
    # rerun the testWrongListener for 20 times
    ./tests/main --gtest_filter='ClientTest.testWrongListener' --gtest_repeat=20
    ```
    
    (cherry picked from commit 10307f90ab9b0d168a5a40f40027177fae587602)
---
 pulsar-client-cpp/lib/ClientImpl.cc   | 4 ++--
 pulsar-client-cpp/tests/ClientTest.cc | 8 +++++---
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 7c9a3a18f4a..d419ffe7dc5 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -609,12 +609,12 @@ void ClientImpl::shutdown() {
     LOG_DEBUG("ioExecutorProvider_ is closed");
 
     timeoutProcessor.tik();
-    listenerExecutorProvider_->close();
+    listenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
     timeoutProcessor.tok();
     LOG_DEBUG("listenerExecutorProvider_ is closed");
 
     timeoutProcessor.tik();
-    partitionListenerExecutorProvider_->close();
+    partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
     timeoutProcessor.tok();
     LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
 }
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index dd073fb6bfc..14c3af70aa3 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -234,17 +234,19 @@ TEST(ClientTest, testWrongListener) {
     Producer producer;
     ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
     ASSERT_EQ(ResultProducerNotInitialized, producer.close());
+    ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
 
+    // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the
+    // creation of Consumer or Reader could fail with ResultConnectError.
+    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
     Consumer consumer;
     ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
     ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());
 
-    ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
     ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
     ASSERT_EQ(ResultOk, client.close());
 
-    // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the
-    // creation of Reader would fail with ResultConnectError.
     client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
 
     // Currently Reader can only read a non-partitioned topic in C++ client


[pulsar] 03/03: [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f00aa58a371c6b520a689fb6c843a7ad2a609b6
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Aug 25 10:20:51 2022 +0800

    [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)
    
    (cherry picked from commit 40d2ae3ac7ea11c8200ea104efcc3c587425b800)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 97 +++++++++++-----------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  2 +-
 pulsar-client-cpp/lib/SynchronizedHashMap.h        | 11 +++
 pulsar-client-cpp/tests/ClientTest.cc              | 11 +++
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc |  9 +-
 5 files changed, 79 insertions(+), 51 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 293ece4c838..5ed66400983 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -82,16 +82,17 @@ void MultiTopicsConsumerImpl::start() {
 void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
                                                        const std::string& topic,
                                                        std::shared_ptr<std::atomic<int>> topicsNeedCreate) {
-    (*topicsNeedCreate)--;
-
     if (result != ResultOk) {
         state_ = Failed;
+        // Use the first failed result
+        auto expectedResult = ResultOk;
+        failedResult.compare_exchange_strong(expectedResult, result);
         LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
+    } else {
+        LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
     }
 
-    LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
-
-    if (topicsNeedCreate->load() == 0) {
+    if (--(*topicsNeedCreate) == 0) {
         MultiTopicsConsumerState state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
@@ -99,11 +100,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
-            closeAsync(nullptr);
-            multiTopicsConsumerCreatedPromise_.setFailed(result);
-            return;
+            // It's safe to capture only this here, because the callback can be called only when this is valid
+            closeAsync(
+                [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
         }
-        return;
     }
 }
 
@@ -364,13 +364,47 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     state_ = Closing;
 
-    auto self = shared_from_this();
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
     int numConsumers = 0;
-    consumers_.forEach(
-        [&numConsumers, &self, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+    consumers_.clear(
+        [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
             numConsumers++;
-            consumer->closeAsync([self, name, callback](Result result) {
-                self->handleSingleConsumerClose(result, name, callback);
+            consumer->closeAsync([this, weakSelf, name, callback](Result result) {
+                auto self = weakSelf.lock();
+                if (!self) {
+                    return;
+                }
+                LOG_DEBUG("Closing the consumer for partition - " << name << " numberTopicPartitions_ - "
+                                                                  << numberTopicPartitions_->load());
+                const int numConsumersLeft = --*numberTopicPartitions_;
+                if (numConsumersLeft < 0) {
+                    LOG_ERROR("[" << name << "] Unexpected number of left consumers: " << numConsumersLeft
+                                  << " during close");
+                    return;
+                }
+                if (result != ResultOk) {
+                    state_ = Failed;
+                    LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - "
+                                                                             << result);
+                }
+                // closed all consumers
+                if (numConsumersLeft == 0) {
+                    messages_.clear();
+                    topicsPartitions_.clear();
+                    unAckedMessageTrackerPtr_->clear();
+
+                    if (state_ != Failed) {
+                        state_ = Closed;
+                    }
+
+                    if (callback) {
+                        callback(result);
+                    }
+                }
             });
         });
     if (numConsumers == 0) {
@@ -387,41 +421,6 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
     failPendingReceiveCallback();
 }
 
-void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string topicPartitionName,
-                                                        CloseCallback callback) {
-    consumers_.remove(topicPartitionName);
-
-    LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - "
-                                                      << numberTopicPartitions_->load());
-
-    assert(numberTopicPartitions_->load() > 0);
-    numberTopicPartitions_->fetch_sub(1);
-
-    if (result != ResultOk) {
-        state_ = Failed;
-        LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
-                                                                 << result);
-    }
-
-    // closed all consumers
-    if (numberTopicPartitions_->load() == 0) {
-        messages_.clear();
-        consumers_.clear();
-        topicsPartitions_.clear();
-        unAckedMessageTrackerPtr_->clear();
-
-        if (state_ != Failed) {
-            state_ = Closed;
-        }
-
-        multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
-        if (callback) {
-            callback(result);
-        }
-        return;
-    }
-}
-
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
     LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
                                                           << " message:" << msg.getDataAsString());
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index d2ede4a8770..0f111110c44 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -105,6 +105,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     MessageListener messageListener_;
     LookupServicePtr lookupServicePtr_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
+    std::atomic<Result> failedResult{ResultOk};
     Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     const std::vector<std::string>& topics_;
@@ -113,7 +114,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     /* methods */
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
-    void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
     void notifyResult(CloseCallback closeCallback);
     void messageReceived(Consumer consumer, const Message& msg);
     void internalListener(Consumer consumer);
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h
index 3a784675dd1..184ca6a2836 100644
--- a/pulsar-client-cpp/lib/SynchronizedHashMap.h
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -70,6 +70,17 @@ class SynchronizedHashMap {
         data_.clear();
     }
 
+    // clear the map and apply `f` on each removed value
+    void clear(std::function<void(const K&, const V&)> f) {
+        Lock lock(mutex_);
+        auto it = data_.begin();
+        while (it != data_.end()) {
+            f(it->first, it->second);
+            auto next = data_.erase(it);
+            it = next;
+        }
+    }
+
     OptValue find(const K& key) const {
         Lock lock(mutex_);
         auto it = data_.find(key);
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 14c3af70aa3..2bc66c6da2b 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -249,6 +249,17 @@ TEST(ClientTest, testWrongListener) {
 
     client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
 
+    Consumer multiTopicsConsumer;
+    ASSERT_EQ(ResultServiceUnitNotReady,
+              client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"},
+                               "sub", multiTopicsConsumer));
+
+    ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
+
+    // Currently Reader can only read a non-partitioned topic in C++ client
+    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
+
     // Currently Reader can only read a non-partitioned topic in C++ client
     Reader reader;
     ASSERT_EQ(ResultServiceUnitNotReady,
diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
index 62c55c46c8e..8d74a24014a 100644
--- a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
+++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
@@ -40,9 +40,16 @@ inline PairVector sort(PairVector pairs) {
 }
 
 TEST(SynchronizedHashMap, testClear) {
-    SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}});
+    SyncMapType m({{1, 100}, {2, 200}});
     m.clear();
     ASSERT_EQ(m.toPairVector(), PairVector{});
+
+    PairVector expectedPairs({{3, 300}, {4, 400}});
+    SyncMapType m2(expectedPairs);
+    PairVector pairs;
+    m2.clear([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); });
+    ASSERT_EQ(m2.toPairVector(), PairVector{});
+    ASSERT_EQ(sort(pairs), expectedPairs);
 }
 
 TEST(SynchronizedHashMap, testRemoveAndFind) {


[pulsar] 02/03: [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 66fbdab74b331581b5bf60da35501bf201d109a6
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Aug 13 10:36:51 2022 +0800

    [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)
    
    ### Motivation
    
    Now, Use a lot of locks to ensure the atomicity of `state_` in the `ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and `MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to improve performance.
    
    ### Modifications
    
    Use an atomic `state_` instead of the lock to improve performance.
    
    (cherry picked from commit fb0f653eadcf6bf72eb8c8efcc29975da6e21267)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 75 +++++----------------
 pulsar-client-cpp/lib/HandlerBase.cc               | 11 ++--
 pulsar-client-cpp/lib/HandlerBase.h                |  2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 71 ++++++--------------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  5 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   | 77 ++++++----------------
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |  3 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          |  5 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              | 59 ++++++-----------
 9 files changed, 90 insertions(+), 218 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 84583d521cc..111b9193aec 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -158,10 +158,7 @@ void ConsumerImpl::start() {
 }
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
-    const auto state = state_;
-    lock.unlock();
-    if (state == Closed) {
+    if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
         return;
     }
@@ -200,7 +197,6 @@ void ConsumerImpl::connectionFailed(Result result) {
     ConsumerImplPtr ptr = shared_from_this();
 
     if (consumerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -272,15 +268,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
 void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO(getName() << "Unsubscribing");
 
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultAlreadyClosed);
         LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
                                "then call unsubscribe");
         return;
     }
 
+    Lock lock(mutex_);
+
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_);
@@ -301,7 +297,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Unsubscribed successfully");
     } else {
@@ -650,12 +645,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -673,12 +666,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 }
 
 Result ConsumerImpl::receiveHelper(Message& msg) {
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
+
     if (messageListener_) {
         LOG_ERROR(getName() << "Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
@@ -705,11 +696,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
@@ -884,13 +872,10 @@ void ConsumerImpl::disconnectConsumer() {
 }
 
 void ConsumerImpl::closeAsync(ResultCallback callback) {
-    Lock lock(mutex_);
-
     // Keep a reference to ensure object is kept alive
     ConsumerImplPtr ptr = shared_from_this();
 
     if (state_ != Ready) {
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -908,7 +893,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
         // If connection is gone, also the consumer is closed on the broker side
         if (callback) {
             callback(ResultOk);
@@ -919,7 +903,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -927,8 +910,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    // Lock is no longer required
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
@@ -944,9 +925,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
-        lock.unlock();
 
         ClientConnectionPtr cnx = getCnx().lock();
         if (cnx) {
@@ -966,22 +945,14 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
 
 void ConsumerImpl::shutdown() {
-    Lock lock(mutex_);
     state_ = Closed;
-    lock.unlock();
 
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
 }
 
-bool ConsumerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool ConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool ConsumerImpl::isOpen() { return state_ == Ready; }
 
 Result ConsumerImpl::pauseMessageListener() {
     if (!messageListener_) {
@@ -1044,14 +1015,13 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
 int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
 
 void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         LOG_ERROR(getName() << "Client connection is not open, please try again later.")
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
 
+    Lock lock(mutex_);
     if (brokerConsumerStats_.isValid()) {
         LOG_DEBUG(getName() << "Serving data from cache");
         BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
@@ -1111,16 +1081,14 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     this->ackGroupingTrackerPtr_->flushAndClean();
     ClientConnectionPtr cnx = getCnx().lock();
@@ -1144,16 +1112,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1217,16 +1183,14 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 }
 
 void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed, MessageId());
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1272,10 +1236,7 @@ void ConsumerImpl::trackMessage(const Message& msg) {
     }
 }
 
-bool ConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
 
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index d7025ad004b..5d2244f7552 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
 HandlerBase::~HandlerBase() { timer_->cancel(); }
 
 void HandlerBase::start() {
-    Lock lock(mutex_);
     // guard against concurrent state changes such as closing
-    if (state_ == NotStarted) {
-        state_ = Pending;
-        lock.unlock();
-
+    State state = NotStarted;
+    if (state_.compare_exchange_strong(state, Pending)) {
         grabCnx();
     }
 }
@@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
         return;
     }
 
-    Lock lock(handler->mutex_);
     State state = handler->state_;
 
     ClientConnectionPtr currentConnection = handler->connection_.lock();
@@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) {
 }
 
 void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
-    if (handler->state_ == Pending || handler->state_ == Ready) {
+    const auto state = handler->state_.load();
+    if (state == Pending || state == Ready) {
         TimeDuration delay = handler->backoff_.next();
 
         LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0)
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index eeb8ebe1c5e..1184746da21 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -105,7 +105,7 @@ class HandlerBase {
         Failed
     };
 
-    State state_;
+    std::atomic<State> state_;
     Backoff backoff_;
     uint64_t epoch_;
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0ae86d5879a..293ece4c838 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -56,7 +56,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
 
 void MultiTopicsConsumerImpl::start() {
     if (topics_.empty()) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
             return;
@@ -84,14 +85,15 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     (*topicsNeedCreate)--;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
     }
 
     LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
 
     if (topicsNeedCreate->load() == 0) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
         } else {
@@ -115,7 +117,8 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
         return topicPromise->getFuture();
     }
 
-    if (state_ == Closed || state_ == Closing) {
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
         topicPromise->setFailed(ResultAlreadyClosed);
         return topicPromise->getFuture();
@@ -222,15 +225,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
 void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
 
-    Lock lock(mutex_);
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_INFO(consumerStr_ << " already closed");
-        lock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
     state_ = Closing;
-    lock.unlock();
 
     std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
     auto self = shared_from_this();
@@ -254,7 +255,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
                   << result << " subscription - " << subscriptionName_);
     }
@@ -266,7 +267,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
         unAckedMessageTrackerPtr_->clear();
 
         Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
-        setState(Closed);
+        state_ = Closed;
         callback(result1);
         return;
     }
@@ -281,7 +282,8 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
         return;
     }
 
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - "
                                                                            << subscriptionName_);
         callback(ResultAlreadyClosed);
@@ -318,7 +320,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
                   << result << " topicPartitionName - " << topicPartitionName);
     }
@@ -350,7 +352,8 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 }
 
 void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed "
                   << " topic" << topic_ << " consumer - " << consumerStr_);
         if (callback) {
@@ -359,7 +362,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    setState(Closing);
+    state_ = Closing;
 
     auto self = shared_from_this();
     int numConsumers = 0;
@@ -373,7 +376,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
     if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << subscriptionName_);
-        setState(Closed);
+        state_ = Closed;
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -395,7 +398,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri
     numberTopicPartitions_->fetch_sub(1);
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
                                                                  << result);
     }
@@ -457,18 +460,14 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
-    lock.unlock();
     messages_.pop(msg);
 
     unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -476,19 +475,15 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
 
-    lock.unlock();
     if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
@@ -501,12 +496,10 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -571,30 +564,11 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
 
-void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
-    Lock lock(mutex_);
-    state_ = state;
-}
-
-bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect,
-                                                 MultiTopicsConsumerState update) {
-    Lock lock(mutex_);
-    if (state_ == expect) {
-        state_ = update;
-        return true;
-    } else {
-        return false;
-    }
-}
-
 void MultiTopicsConsumerImpl::shutdown() {}
 
 bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool MultiTopicsConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; }
 
 void MultiTopicsConsumerImpl::receiveMessages() {
     const auto receiverQueueSize = conf_.getReceiverQueueSize();
@@ -644,12 +618,11 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
 
 void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
+    Lock lock(mutex_);
     MultiTopicsBrokerConsumerStatsPtr statsPtr =
         std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
@@ -715,11 +688,9 @@ void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool MultiTopicsConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         return false;
     }
-    lock.unlock();
 
     return consumers_
         .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 98b2f318af9..d2ede4a8770 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -99,7 +99,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    MultiTopicsConsumerState state_ = Pending;
+    std::atomic<MultiTopicsConsumerState> state_{Pending};
     BlockingQueue<Message> messages_;
     ExecutorServicePtr listenerExecutor_;
     MessageListener messageListener_;
@@ -111,9 +111,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::queue<ReceiveCallback> pendingReceives_;
 
     /* methods */
-    void setState(MultiTopicsConsumerState state);
-    bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update);
-
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
     void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e43b5090e43..0dc9135be59 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -68,13 +68,10 @@ const std::string& PartitionedConsumerImpl::getSubscriptionName() const { return
 const std::string& PartitionedConsumerImpl::getTopic() const { return topic_; }
 
 Result PartitionedConsumerImpl::receive(Message& msg) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
     // See comments in `receive(Message&, int)`
-    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -87,15 +84,9 @@ Result PartitionedConsumerImpl::receive(Message& msg) {
 }
 
 Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
-    // We unlocked `mutex_` here to avoid starvation of methods which are trying to acquire `mutex_`.
-    // In addition, `messageListener_` won't change once constructed, `BlockingQueue::pop` and
-    // `UnAckedMessageTracker::add` are thread-safe, so they don't need `mutex_` to achieve thread-safety.
-    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -114,12 +105,10 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -134,29 +123,23 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing");
     // change state to Closing, so that no Ready state operation is permitted during unsubscribe
-    setState(Closing);
+    state_ = Closing;
     // do not accept un subscribe until we have subscribe to all of the partitions of a topic
     // it's a logical single topic so it should behave like a single topic, even if it's sharded
-    Lock lock(mutex_);
-    if (state_ != Ready) {
-        lock.unlock();
-        unsigned int index = 0;
-        for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-             consumer++) {
-            LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_
-                                                 << " for Topic - " << topicName_->toString());
-            (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync,
-                                                    shared_from_this(), std::placeholders::_1, index++,
-                                                    callback));
-        }
+    unsigned int index = 0;
+    for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+         consumer++) {
+        LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_
+                                             << " for Topic - " << topicName_->toString());
+        (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync,
+                                                shared_from_this(), std::placeholders::_1, index++,
+                                                callback));
     }
 }
 
 void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int consumerIndex,
                                                      ResultCallback callback) {
-    Lock lock(mutex_);
     if (state_ == Failed) {
-        lock.unlock();
         // we have already informed the client that unsubcribe has failed so, ignore this callbacks
         // or do we still go ahead and check how many could we close successfully?
         LOG_DEBUG("handleUnsubscribeAsync callback received in Failed State for consumerIndex - "
@@ -164,9 +147,8 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
                   << subscriptionName_ << " for Topic - " << topicName_->toString());
         return;
     }
-    lock.unlock();
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the parition consumers, consumerIndex - " << consumerIndex);
         callback(ResultUnknownError);
         return;
@@ -181,7 +163,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
     unsubscribedSoFar_++;
     if (unsubscribedSoFar_ == numPartitions) {
         LOG_DEBUG("Unsubscribed all of the partition consumer for subscription - " << subscriptionName_);
-        setState(Closed);
+        state_ = Closed;
         callback(ResultOk);
         return;
     }
@@ -276,7 +258,6 @@ void PartitionedConsumerImpl::start() {
 void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
     Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex) {
     ResultCallback nullCallbackForCleanup = NULL;
-    Lock lock(mutex_);
     if (state_ == Failed) {
         // one of the consumer creation failed, and we are cleaning up
         return;
@@ -286,7 +267,6 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
 
     if (result != ResultOk) {
         state_ = Failed;
-        lock.unlock();
         partitionedConsumerCreatedPromise_.setFailed(result);
         // unsubscribed all of the successfully subscribed partitioned consumers
         closeAsync(nullCallbackForCleanup);
@@ -295,12 +275,13 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
     }
 
     assert(partitionIndex < numPartitions && partitionIndex >= 0);
+    Lock lock(mutex_);
     numConsumersCreated_++;
+    lock.unlock();
     if (numConsumersCreated_ == numPartitions) {
         LOG_INFO("Successfully Subscribed to Partitioned Topic - " << topicName_->toString() << " with - "
                                                                    << numPartitions << " Partitions.");
         state_ = Ready;
-        lock.unlock();
         if (partitionsUpdateTimer_) {
             runPartitionUpdateTask();
         }
@@ -312,7 +293,6 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
 
 void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, unsigned int partitionIndex,
                                                                  CloseCallback callback) {
-    Lock lock(mutex_);
     if (state_ == Failed) {
         // we should have already notified the client by callback
         return;
@@ -320,7 +300,6 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
     if (result != ResultOk) {
         state_ = Failed;
         LOG_ERROR("Closing the consumer failed for partition - " << partitionIndex);
-        lock.unlock();
         partitionedConsumerCreatedPromise_.setFailed(result);
         if (callback) {
             callback(result);
@@ -328,13 +307,14 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
         return;
     }
     assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0);
+    Lock lock(mutex_);
     if (numConsumersCreated_ > 0) {
         numConsumersCreated_--;
     }
+    lock.unlock();
     // closed all successfully
     if (!numConsumersCreated_) {
         state_ = Closed;
-        lock.unlock();
         // set the producerCreatedPromise to failure
         partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
         if (callback) {
@@ -344,11 +324,12 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
     }
 }
 void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
+    Lock lock(consumersMutex_);
     if (consumers_.empty()) {
         notifyResult(callback);
         return;
     }
-    setState(Closed);
+    state_ = Closed;
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
     // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
@@ -376,29 +357,20 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
 void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) {
     if (closeCallback) {
         // this means client invoked the closeAsync with a valid callback
-        setState(Closed);
+        state_ = Closed;
         closeCallback(ResultOk);
     } else {
         // consumer create failed, closeAsync called to cleanup the successfully created producers
-        setState(Failed);
+        state_ = Failed;
         partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
     }
 }
 
-void PartitionedConsumerImpl::setState(const PartitionedConsumerState state) {
-    Lock lock(mutex_);
-    state_ = state;
-    lock.unlock();
-}
-
 void PartitionedConsumerImpl::shutdown() {}
 
 bool PartitionedConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool PartitionedConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool PartitionedConsumerImpl::isOpen() { return state_ == Ready; }
 
 void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
     LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
@@ -502,9 +474,7 @@ const std::string& PartitionedConsumerImpl::getName() const { return partitionSt
 int PartitionedConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
 
 void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
@@ -513,7 +483,6 @@ void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
         std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions);
     LatchPtr latchPtr = std::make_shared<Latch>(numPartitions);
     ConsumerList consumerList = consumers_;
-    lock.unlock();
     for (int i = 0; i < consumerList.size(); i++) {
         consumerList[i]->getBrokerConsumerStatsAsync(
             std::bind(&PartitionedConsumerImpl::handleGetConsumerStats, shared_from_this(),
@@ -545,16 +514,13 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
 }
 
 void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
-        stateLock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
 
     // consumers_ could only be modified when state_ is Ready, so we needn't lock consumersMutex_ here
     ConsumerList consumerList = consumers_;
-    stateLock.unlock();
 
     MultiResultCallback multiResultCallback(callback, consumers_.size());
     for (ConsumerList::const_iterator i = consumerList.begin(); i != consumerList.end(); i++) {
@@ -577,7 +543,6 @@ void PartitionedConsumerImpl::getPartitionMetadata() {
 
 void PartitionedConsumerImpl::handleGetPartitions(Result result,
                                                   const LookupDataResultPtr& lookupDataResult) {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return;
     }
@@ -614,11 +579,9 @@ void PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool PartitionedConsumerImpl::isConnected() const {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return false;
     }
-    stateLock.unlock();
 
     Lock consumersLock(consumersMutex_);
     const auto consumers = consumers_;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 7fa0ccdd1f4..8f4faf09954 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -92,7 +92,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     mutable std::mutex consumersMutex_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    PartitionedConsumerState state_ = Pending;
+    std::atomic<PartitionedConsumerState> state_{Pending};
     unsigned int unsubscribedSoFar_ = 0;
     BlockingQueue<Message> messages_;
     ExecutorServicePtr listenerExecutor_;
@@ -109,7 +109,6 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     unsigned int getNumPartitionsWithLock() const;
     ConsumerConfiguration getSinglePartitionConsumerConfig() const;
     ConsumerImplPtr newInternalConsumer(unsigned int partition, const ConsumerConfiguration& config) const;
-    void setState(PartitionedConsumerState state);
     void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback);
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 34e912d4ddc..79ed1969d78 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -55,8 +55,9 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
         return;
     }
 
-    if (state_ != Ready) {
-        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_);
+    const auto state = state_.load();
+    if (state != Ready) {
+        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state);
         resetAutoDiscoveryTimer();
         return;
     }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index e15d388ef64..a446c2b4b25 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -136,13 +136,10 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
 }
 
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
     if (state_ == Closed) {
-        lock.unlock();
         LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
         return;
     }
-    lock.unlock();
 
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
@@ -164,7 +161,6 @@ void ProducerImpl::connectionFailed(Result result) {
         // so don't change the state and allow reconnections
         return;
     } else if (producerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -175,14 +171,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
 
     // make sure we're still in the Pending/Ready state, closeAsync could have been invoked
     // while waiting for this response if using lazy producers
-    Lock lock(mutex_);
-    if (state_ != Ready && state_ != Pending) {
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already closed");
         failPendingMessages(ResultAlreadyClosed, false);
         return;
     }
 
     if (result == ResultOk) {
+        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
@@ -217,8 +214,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
-        lock.unlock();
-
         // Producer creation failed
         if (result == ResultTimeout) {
             // Creating the producer has timed out. We need to ensure the broker closes the producer
@@ -248,7 +243,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                 LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
                 failPendingMessages(result, true);
                 producerCreatedPromise_.setFailed(result);
-                Lock lock(mutex_);
                 state_ = Failed;
             }
         }
@@ -334,9 +328,8 @@ void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, Send
 
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
-
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend(callback);
             lock.unlock();
             failures.complete();
@@ -350,8 +343,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
 
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend();
             lock.unlock();
             failures.complete();
@@ -551,8 +544,9 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
 
     // ignore if the producer is already closing/closed
-    Lock lock(mutex_);
-    if (state_ == Pending || state_ == Ready) {
+    const auto state = state_.load();
+    if (state == Pending || state == Ready) {
+        Lock lock(mutex_);
         auto failures = batchMessageAndSend();
         lock.unlock();
         failures.complete();
@@ -569,11 +563,9 @@ void ProducerImpl::printStats() {
 }
 
 void ProducerImpl::closeAsync(CloseCallback callback) {
-    Lock lock(mutex_);
-
     // if the producer was never started then there is nothing to clean up
-    if (state_ == NotStarted) {
-        state_ = Closed;
+    State expectedState = NotStarted;
+    if (state_.compare_exchange_strong(expectedState, Closed)) {
         callback(ResultOk);
         return;
     }
@@ -586,9 +578,11 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // ensure any remaining send callbacks are called before calling the close callback
     failPendingMessages(ResultAlreadyClosed, false);
 
-    if (state_ != Ready && state_ != Pending) {
+    // TODO  maybe we need a loop here to implement CAS for a condition,
+    // just like Java's `getAndUpdate` method on an atomic variable
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         state_ = Closed;
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -601,7 +595,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
+
         if (callback) {
             callback(ResultOk);
         }
@@ -615,7 +609,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -623,7 +616,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
         return;
     }
 
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
@@ -636,7 +628,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
 void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Closed producer");
         ClientConnectionPtr cnx = getCnx().lock();
@@ -659,10 +650,11 @@ Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture()
 uint64_t ProducerImpl::getProducerId() const { return producerId_; }
 
 void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
-    Lock lock(mutex_);
-    if (state_ != Pending && state_ != Ready) {
+    const auto state = state_.load();
+    if (state != Pending && state != Ready) {
         return;
     }
+    Lock lock(mutex_);
 
     if (err == boost::asio::error::operation_aborted) {
         LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
@@ -837,22 +829,13 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr
     return a->getProducerId() < b->getProducerId();
 }
 
-bool ProducerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ProducerImpl::isClosed() { return state_ == Closed; }
 
-bool ProducerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
 
 uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }
 
-bool ProducerImpl::isStarted() const {
-    Lock lock(mutex_);
-    return state_ != NotStarted;
-}
+bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
 void ProducerImpl::startSendTimeoutTimer() {
     // Initialize the sendTimer only once per producer and only when producer timeout is
     // configured. Set the timeout as configured value and asynchronously wait for the