You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/20 14:45:25 UTC

[pulsar-client-cpp] branch main updated: [fix] Fix memory leak caused by incorrect close and destruction (#54)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 55b4bc9  [fix] Fix memory leak caused by incorrect close and destruction (#54)
55b4bc9 is described below

commit 55b4bc9406f884c3a7a2a7287d098558c12d003b
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 20 22:45:20 2022 +0800

    [fix] Fix memory leak caused by incorrect close and destruction (#54)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/55
    
    ### Motivation
    
    1. When a producer or consumer is closed, the reference is still stored
       in `ClientImpl`. If a client kept creating producers or consumers,
       the memory usage would not reduce.
    2. When the `HandlerBase::connection_` field is modified, the
       `removeProducer` or `removeConsumer` method is not called. Then these
       producers and consumers will be cached in the connection until the
       connection is closed.
    3. The `PartitionedProducerImpl` and `MultiTopicsConsumerImpl` have
       cyclic references, when a `Producer` or `Consumer` instance goes out
       of the scope, the destructors are not called. When I used GDB to
       debug them, I found the reference counts were both greater than 1.
    
    ### Modifications
    
    Let's use "handlers" to represent "producers and consumers".
    
    1. In `ClientImpl`, use `SynchronizedHashMap` to store references of
       handlers, as well as the `cleanupXXX` methods to remove a handler.
    2. Add `HandlerBase::beforeConnectionChange` method, which is called
       before `connection_` is modified. Disallow the access to
      `connection_` from derived classes.
    3. Avoid `shared_from_this()` being passed into callbacks in ASIO
       executors for `PartitionedProducerImpl` and
       `MultiTopicsConsumerImpl`.
    
    This PR also unifies the `shutdown` implementations for handlers and
    call `shutdown` in the destructors.
    1. Cancel the timers
    2. Unregister itself from `ClientImpl` and `ClientConnection`
    3. Set the create future with `ResultAlreadyClosed`
    4. Set the state to `Closed`
    
    It's called when:
    - the destructor is called
    - `closeAsync` is completed
    - `unsubscribeAsync` is completed with ResultOk
    
    ### Verifications
    
    `ShutdownTest` is added to verify the following cases:
    - a single topic
    - a partitioned topic (multiple topics)
    - a partitioned topic with regex subscription
    
    `testClose` verifies `shutdown` when `closeAsync` and `unsubscribeAsync`
    are called. `testDestructor` verifies `shutdown` when handlers go out of
    the scope and the destructors are called.
---
 .github/workflows/ci-pr-validation.yaml |   2 +-
 lib/ClientConnection.h                  |   2 +-
 lib/ClientImpl.cc                       |  89 +++++++------
 lib/ClientImpl.h                        |  13 +-
 lib/ConnectionPool.h                    |   4 +-
 lib/ConsumerImpl.cc                     | 125 +++++++++---------
 lib/ConsumerImpl.h                      |   5 +-
 lib/HandlerBase.cc                      |  24 ++--
 lib/HandlerBase.h                       |  20 ++-
 lib/MultiTopicsConsumerImpl.cc          | 217 ++++++++++++++++++++++----------
 lib/MultiTopicsConsumerImpl.h           |  11 +-
 lib/PartitionedProducerImpl.cc          |  83 ++++++++----
 lib/PartitionedProducerImpl.h           |   5 +-
 lib/PatternMultiTopicsConsumerImpl.cc   |  19 +--
 lib/PatternMultiTopicsConsumerImpl.h    |   1 +
 lib/PeriodicTask.cc                     |   5 +-
 lib/PeriodicTask.h                      |   2 +-
 lib/ProducerImpl.cc                     |  90 ++++++-------
 lib/ProducerImpl.h                      |   5 +-
 lib/SynchronizedHashMap.h               |  18 +--
 tests/ClientTest.cc                     |  28 ++---
 tests/PulsarFriend.h                    |  35 +++++-
 tests/ShutdownTest.cc                   | 121 ++++++++++++++++++
 tests/WaitUtils.h                       |  43 -------
 24 files changed, 610 insertions(+), 357 deletions(-)

diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml
index 1485708..bd47a81 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -68,7 +68,7 @@ jobs:
         run: ./pulsar-test-service-start.sh
 
       - name: Run unit tests
-        run: ./run-unit-tests.sh
+        run: RETRY_FAILED=3 ./run-unit-tests.sh
 
       - name: Stop Pulsar service
         run: ./pulsar-test-service-stop.sh
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 418a583..8a48408 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -314,7 +314,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
     PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
 
-    std::mutex mutex_;
+    mutable std::mutex mutex_;
     typedef std::unique_lock<std::mutex> Lock;
 
     // Pending buffers to write on the socket
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 29e92f3..025727a 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -189,9 +189,15 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
 void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                        CreateProducerCallback callback, ProducerImplBasePtr producer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
-        producers_.push_back(producer);
-        lock.unlock();
+        auto pair = producers_.emplace(producer.get(), producer);
+        if (!pair.second) {
+            auto existingProducer = pair.first->second.lock();
+            LOG_ERROR("Unexpected existing producer at the same address: "
+                      << pair.first->first << ", producer: "
+                      << (existingProducer ? existingProducer->getProducerName() : "(null)"));
+            callback(ResultUnknownError, {});
+            return;
+        }
         callback(result, Producer(producer));
     } else {
         callback(result, {});
@@ -241,9 +247,18 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
     ConsumerImplBasePtr consumer = reader->getConsumer().lock();
     auto self = shared_from_this();
     reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
-        Lock lock(mutex_);
-        consumers_.push_back(weakConsumerPtr);
-        lock.unlock();
+        auto consumer = weakConsumerPtr.lock();
+        if (consumer) {
+            auto pair = consumers_.emplace(consumer.get(), consumer);
+            if (!pair.second) {
+                auto existingConsumer = pair.first->second.lock();
+                LOG_ERROR("Unexpected existing consumer at the same address: "
+                          << pair.first->first
+                          << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+            }
+        } else {
+            LOG_ERROR("Unexpected case: the consumer is somehow expired");
+        }
     });
 }
 
@@ -397,9 +412,15 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
 void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                        SubscribeCallback callback, ConsumerImplBasePtr consumer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
-        consumers_.push_back(consumer);
-        lock.unlock();
+        auto pair = consumers_.emplace(consumer.get(), consumer);
+        if (!pair.second) {
+            auto existingConsumer = pair.first->second.lock();
+            LOG_ERROR("Unexpected existing consumer at the same address: "
+                      << pair.first->first
+                      << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+            callback(ResultUnknownError, {});
+            return;
+        }
         callback(result, Consumer(consumer));
     } else {
         callback(result, {});
@@ -477,27 +498,26 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
 }
 
 void ClientImpl::closeAsync(CloseCallback callback) {
-    Lock lock(mutex_);
-    ProducersList producers(producers_);
-    ConsumersList consumers(consumers_);
-
-    if (state_ != Open && callback) {
-        lock.unlock();
-        callback(ResultAlreadyClosed);
+    if (state_ != Open) {
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
         return;
     }
     // Set the state to Closing so that no producers could get added
     state_ = Closing;
-    lock.unlock();
 
     memoryLimitController_.close();
 
+    auto producers = producers_.move();
+    auto consumers = consumers_.move();
+
     SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
     LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
                                            << " consumers");
 
-    for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
-        ProducerImplBasePtr producer = it->lock();
+    for (auto&& kv : producers) {
+        ProducerImplBasePtr producer = kv.second.lock();
         if (producer && !producer->isClosed()) {
             producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
                                            std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -507,8 +527,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
         }
     }
 
-    for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
-        ConsumerImplBasePtr consumer = it->lock();
+    for (auto&& kv : consumers) {
+        ConsumerImplBasePtr consumer = kv.second.lock();
         if (consumer && !consumer->isClosed()) {
             consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
                                            std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -562,23 +582,18 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
 }
 
 void ClientImpl::shutdown() {
-    Lock lock(mutex_);
-    ProducersList producers;
-    ConsumersList consumers;
+    auto producers = producers_.move();
+    auto consumers = consumers_.move();
 
-    producers.swap(producers_);
-    consumers.swap(consumers_);
-    lock.unlock();
-
-    for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
-        ProducerImplBasePtr producer = it->lock();
+    for (auto&& kv : producers) {
+        ProducerImplBasePtr producer = kv.second.lock();
         if (producer) {
             producer->shutdown();
         }
     }
 
-    for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
-        ConsumerImplBasePtr consumer = it->lock();
+    for (auto&& kv : consumers) {
+        ConsumerImplBasePtr consumer = kv.second.lock();
         if (consumer) {
             consumer->shutdown();
         }
@@ -631,26 +646,24 @@ uint64_t ClientImpl::newRequestId() {
 }
 
 uint64_t ClientImpl::getNumberOfProducers() {
-    Lock lock(mutex_);
     uint64_t numberOfAliveProducers = 0;
-    for (const auto& producer : producers_) {
+    producers_.forEachValue([&numberOfAliveProducers](const ProducerImplBaseWeakPtr& producer) {
         const auto& producerImpl = producer.lock();
         if (producerImpl) {
             numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
         }
-    }
+    });
     return numberOfAliveProducers;
 }
 
 uint64_t ClientImpl::getNumberOfConsumers() {
-    Lock lock(mutex_);
     uint64_t numberOfAliveConsumers = 0;
-    for (const auto& consumer : consumers_) {
+    consumers_.forEachValue([&numberOfAliveConsumers](const ConsumerImplBaseWeakPtr& consumer) {
         const auto consumerImpl = consumer.lock();
         if (consumerImpl) {
             numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
         }
-    }
+    });
     return numberOfAliveConsumers;
 }
 
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index e8e7708..50ddeff 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -31,6 +31,7 @@
 #include <atomic>
 #include <vector>
 #include "ServiceNameResolver.h"
+#include "SynchronizedHashMap.h"
 
 namespace pulsar {
 
@@ -91,6 +92,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     ExecutorServiceProviderPtr getListenerExecutorProvider();
     ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
     LookupServicePtr getLookup();
+
+    void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
+
+    void cleanupConsumer(ConsumerImplBase* address) { consumers_.remove(address); }
+
     friend class PulsarFriend;
 
    private:
@@ -147,11 +153,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     uint64_t consumerIdGenerator_;
     uint64_t requestIdGenerator_;
 
-    typedef std::vector<ProducerImplBaseWeakPtr> ProducersList;
-    ProducersList producers_;
-
-    typedef std::vector<ConsumerImplBaseWeakPtr> ConsumersList;
-    ConsumersList consumers_;
+    SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
+    SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
 
     std::atomic<Result> closingError;
 
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 21d439e..996df54 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -74,10 +74,10 @@ class PULSAR_PUBLIC ConnectionPool {
     typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
     PoolMap pool_;
     bool poolConnections_;
-    std::mutex mutex_;
+    mutable std::mutex mutex_;
     std::atomic_bool closed_{false};
 
-    friend class ConnectionPoolTest;
+    friend class PulsarFriend;
 };
 }  // namespace pulsar
 #endif  //_PULSAR_CONNECTION_POOL_HEADER_
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 54e346f..7be5a6a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -104,7 +104,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
 
 ConsumerImpl::~ConsumerImpl() {
     LOG_DEBUG(getName() << "~ConsumerImpl");
-    incomingMessages_.clear();
     if (state_ == Ready) {
         // this could happen at least in this condition:
         //      consumer seek, caused reconnection, if consumer close happened before connection ready,
@@ -121,6 +120,7 @@ ConsumerImpl::~ConsumerImpl() {
             LOG_INFO(getName() << "Closed consumer for race condition: " << consumerId_);
         }
     }
+    shutdown();
 }
 
 void ConsumerImpl::setPartitionIndex(int partitionIndex) { partitionIndex_ = partitionIndex; }
@@ -156,6 +156,8 @@ void ConsumerImpl::start() {
     ackGroupingTrackerPtr_->start();
 }
 
+void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { cnx.removeConsumer(consumerId_); }
+
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
@@ -220,7 +222,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
         LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
         {
             Lock lock(mutex_);
-            connection_ = cnx;
+            setCnx(cnx);
             incomingMessages_.clear();
             state_ = Ready;
             backoff_.reset();
@@ -267,13 +269,24 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
     }
 }
 
-void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     LOG_INFO(getName() << "Unsubscribing");
 
+    auto callback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            shutdown();
+            LOG_INFO(getName() << "Unsubscribed successfully");
+        } else {
+            state_ = Ready;
+            LOG_WARN(getName() << "Failed to unsubscribe: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+
     if (state_ != Ready) {
         callback(ResultAlreadyClosed);
-        LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
-                               "then call unsubscribe");
         return;
     }
 
@@ -286,9 +299,9 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
         lock.unlock();
         int requestId = client->newRequestId();
         SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
+        auto self = get_shared_this_ptr();
         cnx->sendRequestWithId(cmd, requestId)
-            .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, get_shared_this_ptr(),
-                                   std::placeholders::_1, callback));
+            .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
     } else {
         Result result = ResultNotConnected;
         lock.unlock();
@@ -297,16 +310,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     }
 }
 
-void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
-    if (result == ResultOk) {
-        state_ = Closed;
-        LOG_INFO(getName() << "Unsubscribed successfully");
-    } else {
-        LOG_WARN(getName() << "Failed to unsubscribe: " << strResult(result));
-    }
-    callback(result);
-}
-
 Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
                                                          const proto::MessageMetadata& metadata,
                                                          const MessageId& messageId,
@@ -990,20 +993,25 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
 
 void ConsumerImpl::disconnectConsumer() {
     LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
-    Lock lock(mutex_);
-    connection_.reset();
-    lock.unlock();
+    resetCnx();
     scheduleReconnection(get_shared_this_ptr());
 }
 
-void ConsumerImpl::closeAsync(ResultCallback callback) {
-    // Keep a reference to ensure object is kept alive
-    ConsumerImplPtr ptr = get_shared_this_ptr();
+void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
+    auto callback = [this, originalCallback](Result result) {
+        shutdown();
+        if (result == ResultOk) {
+            LOG_INFO(getName() << "Closed consumer " << consumerId_);
+        } else {
+            LOG_WARN(getName() << "Failed to close consumer: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
 
     if (state_ != Ready) {
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
         return;
     }
 
@@ -1018,66 +1026,40 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
-        state_ = Closed;
         // If connection is gone, also the consumer is closed on the broker side
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
     ClientImplPtr client = client_.lock();
     if (!client) {
-        state_ = Closed;
         // Client was already destroyed
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
-    int requestId = client->newRequestId();
-    Future<Result, ResponseData> future =
-        cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
-    if (callback) {
-        // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
-        future.addListener(std::bind(&ConsumerImpl::handleClose, get_shared_this_ptr(), std::placeholders::_1,
-                                     callback, ptr));
-    }
+    cancelTimers();
 
-    // fail pendingReceive callback
-    failPendingReceiveCallback();
-    failPendingBatchReceiveCallback();
-
-    // cancel timer
-    batchReceiveTimer_->cancel();
-}
-
-void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
-    if (result == ResultOk) {
-        state_ = Closed;
-
-        ClientConnectionPtr cnx = getCnx().lock();
-        if (cnx) {
-            cnx->removeConsumer(consumerId_);
-        }
-
-        LOG_INFO(getName() << "Closed consumer " << consumerId_);
-    } else {
-        LOG_ERROR(getName() << "Failed to close consumer: " << result);
-    }
-
-    if (callback) {
-        callback(result);
-    }
+    int requestId = client->newRequestId();
+    auto self = get_shared_this_ptr();
+    cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
+        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
 }
 
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
 
 void ConsumerImpl::shutdown() {
-    state_ = Closed;
-
+    incomingMessages_.clear();
+    resetCnx();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupConsumer(this);
+    }
+    cancelTimers();
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    failPendingReceiveCallback();
+    failPendingBatchReceiveCallback();
+    state_ = Closed;
 }
 
 bool ConsumerImpl::isClosed() { return state_ == Closed; }
@@ -1437,4 +1419,9 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
     return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this());
 }
 
+void ConsumerImpl::cancelTimers() noexcept {
+    boost::system::error_code ec;
+    batchReceiveTimer_->cancel(ec);
+}
+
 } /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 09d2c5c..3aa632a 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -84,7 +84,6 @@ class ConsumerImpl : public ConsumerImplBase {
     void activeConsumerChanged(bool isActive);
     inline proto::CommandSubscribe_SubType getSubType();
     inline proto::CommandSubscribe_InitialPosition getInitialPosition();
-    void handleUnsubscribe(Result result, ResultCallback callback);
 
     /**
      * Send individual ACK request of given message ID to broker.
@@ -140,6 +139,7 @@ class ConsumerImpl : public ConsumerImplBase {
     virtual bool isReadCompacted();
     virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
     virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
+    void beforeConnectionChange(ClientConnection& cnx) override;
 
    protected:
     // overrided methods from HandlerBase
@@ -156,7 +156,8 @@ class ConsumerImpl : public ConsumerImplBase {
 
     void internalConsumerChangeListener(bool isActive);
 
-    void handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer);
+    void cancelTimers() noexcept;
+
     ConsumerStatsBasePtr consumerStatsBasePtr_;
 
    private:
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 506207e..1f4ce6e 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -30,7 +30,6 @@ namespace pulsar {
 HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
     : client_(client),
       topic_(topic),
-      connection_(),
       executor_(client->getIOExecutorProvider()->get()),
       mutex_(),
       creationTimestamp_(TimeUtils::now()),
@@ -50,14 +49,25 @@ void HandlerBase::start() {
     }
 }
 
+ClientConnectionWeakPtr HandlerBase::getCnx() const {
+    Lock lock(connectionMutex_);
+    return connection_;
+}
+
+void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
+    Lock lock(connectionMutex_);
+    auto previousCnx = connection_.lock();
+    if (previousCnx) {
+        beforeConnectionChange(*previousCnx);
+    }
+    connection_ = cnx;
+}
+
 void HandlerBase::grabCnx() {
-    Lock lock(mutex_);
-    if (connection_.lock()) {
-        lock.unlock();
+    if (getCnx().lock()) {
         LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
         return;
     }
-    lock.unlock();
     LOG_INFO(getName() << "Getting connection from pool");
     ClientImplPtr client = client_.lock();
     Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_);
@@ -96,14 +106,14 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
 
     State state = handler->state_;
 
-    ClientConnectionPtr currentConnection = handler->connection_.lock();
+    ClientConnectionPtr currentConnection = handler->getCnx().lock();
     if (currentConnection && connection.lock().get() != currentConnection.get()) {
         LOG_WARN(handler->getName()
                  << "Ignoring connection closed since we are already attached to a newer connection");
         return;
     }
 
-    handler->connection_.reset();
+    handler->resetCnx();
 
     if (result == ResultRetryable) {
         scheduleReconnection(handler);
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 6fc3603..6616ec4 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -44,11 +44,9 @@ class HandlerBase {
 
     void start();
 
-    /*
-     * get method for derived class to access weak ptr to connection so that they
-     * have to check if they can get a shared_ptr out of it or not
-     */
-    ClientConnectionWeakPtr getCnx() const { return connection_; }
+    ClientConnectionWeakPtr getCnx() const;
+    void setCnx(const ClientConnectionPtr& cnx);
+    void resetCnx() { setCnx(nullptr); }
 
    protected:
     /*
@@ -65,6 +63,14 @@ class HandlerBase {
      * Should we retry in error that are transient
      */
     bool isRetriableError(Result result);
+
+    /**
+     * Do some cleanup work before changing `connection_` to `cnx`.
+     *
+     * @param cnx the current connection
+     */
+    virtual void beforeConnectionChange(ClientConnection& cnx) = 0;
+
     /*
      * connectionOpened will be implemented by derived class to receive notification
      */
@@ -86,7 +92,6 @@ class HandlerBase {
    protected:
     ClientImplWeakPtr client_;
     const std::string topic_;
-    ClientConnectionWeakPtr connection_;
     ExecutorServicePtr executor_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
@@ -112,6 +117,9 @@ class HandlerBase {
 
    private:
     DeadlineTimerPtr timer_;
+
+    mutable std::mutex connectionMutex_;
+    ClientConnectionWeakPtr connection_;
     friend class ClientConnection;
     friend class PulsarFriend;
 };
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 573c33d..c54f8e8 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -19,6 +19,7 @@
 #include "MultiTopicsConsumerImpl.h"
 #include "MultiResultCallback.h"
 #include "MessagesImpl.h"
+#include <stdexcept>
 
 DECLARE_LOG_OBJECT()
 
@@ -55,11 +56,11 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
-    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval());
     if (partitionsUpdateInterval > 0) {
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
         partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
-        lookupServicePtr_ = client_->getLookup();
+        lookupServicePtr_ = client->getLookup();
     }
 
     state_ = Pending;
@@ -83,10 +84,16 @@ void MultiTopicsConsumerImpl::start() {
     int topicsNumber = topics_.size();
     std::shared_ptr<std::atomic<int>> topicsNeedCreate = std::make_shared<std::atomic<int>>(topicsNumber);
     // subscribe for each passed in topic
+    auto weakSelf = weak_from_this();
     for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
-        subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed,
-                                                           get_shared_this_ptr(), std::placeholders::_1,
-                                                           std::placeholders::_2, *itr, topicsNeedCreate));
+        auto topic = *itr;
+        subscribeOneTopicAsync(topic).addListener(
+            [this, weakSelf, topic, topicsNeedCreate](Result result, const Consumer& consumer) {
+                auto self = weakSelf.lock();
+                if (self) {
+                    handleOneTopicSubscribed(result, consumer, topic, topicsNeedCreate);
+                }
+            });
     }
 }
 
@@ -111,9 +118,9 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
-            // It's safe to capture only this here, because the callback can be called only when this is valid
-            closeAsync(
-                [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
+            // `shutdown()`, which set multiTopicsConsumerCreatedPromise_ with `failedResult`, will be called
+            // when `closeAsync` completes.
+            closeAsync(nullptr);
         }
     }
 }
@@ -164,10 +171,20 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
                                                        ConsumerSubResultPromisePtr topicSubResultPromise) {
     std::shared_ptr<ConsumerImpl> consumer;
     ConsumerConfiguration config = conf_.clone();
-    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
+    auto client = client_.lock();
+    if (!client) {
+        topicSubResultPromise->setFailed(ResultAlreadyClosed);
+        return;
+    }
+    ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
 
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
-                                        std::placeholders::_1, std::placeholders::_2));
+    auto weakSelf = weak_from_this();
+    config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+        auto self = weakSelf.lock();
+        if (self) {
+            messageReceived(consumer, msg);
+        }
+    });
 
     int partitions = numPartitions == 0 ? 1 : numPartitions;
 
@@ -186,7 +203,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
     // non-partitioned topic
     if (numPartitions == 0) {
         // We don't have to add partition-n suffix
-        consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
+        consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
                                                   topicName->isPersistent(), internalListenerExecutor, true,
                                                   NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(std::bind(
@@ -199,7 +216,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
     } else {
         for (int i = 0; i < numPartitions; i++) {
             std::string topicPartitionName = topicName->getTopicPartitionName(i);
-            consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+            consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
                                                       topicName->isPersistent(), internalListenerExecutor,
                                                       true, Partitioned);
             consumer->getConsumerCreatedFuture().addListener(std::bind(
@@ -244,12 +261,24 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
     }
 }
 
-void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
 
+    auto callback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            shutdown();
+            LOG_INFO(getName() << "Unsubscribed successfully");
+        } else {
+            state_ = Ready;
+            LOG_WARN(getName() << "Failed to unsubscribe: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+
     const auto state = state_.load();
     if (state == Closing || state == Closed) {
-        LOG_INFO(consumerStr_ << " already closed");
         callback(ResultAlreadyClosed);
         return;
     }
@@ -284,12 +313,9 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
 
     if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
         LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer.  - " << consumerStr_);
-        consumers_.clear();
-        topicsPartitions_.clear();
-        unAckedMessageTrackerPtr_->clear();
-
         Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
-        state_ = Closed;
+        // The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if
+        // unsubscribe succeeds.
         callback(result1);
         return;
     }
@@ -376,20 +402,27 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     }
 }
 
-void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
+    auto callback = [this, originalCallback](Result result) {
+        shutdown();
+        if (result != ResultOk) {
+            LOG_WARN(getName() << "Failed to close consumer: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
     const auto state = state_.load();
     if (state == Closing || state == Closed) {
-        LOG_ERROR("TopicsConsumer already closed "
-                  << " topic" << topic_ << " consumer - " << consumerStr_);
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
         return;
     }
 
     state_ = Closing;
 
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+    cancelTimers();
+
+    auto weakSelf = weak_from_this();
     int numConsumers = 0;
     consumers_.clear(
         [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
@@ -418,27 +451,14 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
                 }
                 // closed all consumers
                 if (numConsumersLeft == 0) {
-                    incomingMessages_.clear();
-                    topicsPartitions_.clear();
-                    unAckedMessageTrackerPtr_->clear();
-
-                    if (state_ != Failed) {
-                        state_ = Closed;
-                    }
-
-                    if (callback) {
-                        callback(result);
-                    }
+                    callback(result);
                 }
             });
         });
     if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << subscriptionName_);
-        state_ = Closed;
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
         return;
     }
 
@@ -461,8 +481,13 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         lock.unlock();
-        listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        auto weakSelf = weak_from_this();
+        listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
+            auto self = weakSelf.lock();
+            if (self) {
+                notifyPendingReceivedCallback(ResultOk, msg, callback);
+            }
+        });
         return;
     }
 
@@ -564,13 +589,18 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
-        listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
+        auto weakSelf = weak_from_this();
+        listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
+            auto self = weakSelf.lock();
+            if (self) {
+                notifyPendingReceivedCallback(ResultAlreadyClosed, msg, callback);
+            }
+        });
     }
     lock.unlock();
 }
 
-void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const Message& msg,
                                                             const ReceiveCallback& callback) {
     if (result == ResultOk) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -609,7 +639,7 @@ void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
     }
 }
 
-MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
+MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { shutdown(); }
 
 Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
     return multiTopicsConsumerCreatedPromise_.getFuture();
@@ -620,7 +650,24 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
 
-void MultiTopicsConsumerImpl::shutdown() {}
+void MultiTopicsConsumerImpl::shutdown() {
+    cancelTimers();
+    incomingMessages_.clear();
+    topicsPartitions_.clear();
+    unAckedMessageTrackerPtr_->clear();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupConsumer(this);
+    }
+    consumers_.clear();
+    topicsPartitions_.clear();
+    if (failedResult != ResultOk) {
+        multiTopicsConsumerCreatedPromise_.setFailed(failedResult);
+    } else {
+        multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    }
+    state_ = Closed;
+}
 
 bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
 
@@ -684,13 +731,16 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
     lock.unlock();
 
-    auto self = get_shared_this_ptr();
     size_t i = 0;
-    consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
+    consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
         size_t index = i++;
+        auto weakSelf = weak_from_this();
         consumer->getBrokerConsumerStatsAsync(
-            [self, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
-                self->handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+            [this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
+                auto self = weakSelf.lock();
+                if (self) {
+                    handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+                }
             });
     });
 }
@@ -772,7 +822,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
 }
 void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+    auto weakSelf = weak_from_this();
     partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
         // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
         // cannot continue at this time, and the request needs to be ignored.
@@ -790,9 +840,15 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
     for (const auto& item : topicsPartitions) {
         auto topicName = TopicName::get(item.first);
         auto currentNumPartitions = item.second;
+        auto weakSelf = weak_from_this();
         lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
-            std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, get_shared_this_ptr(), topicName,
-                      std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
+            [this, weakSelf, topicName, currentNumPartitions](Result result,
+                                                              const LookupDataResultPtr& lookupDataResult) {
+                auto self = weakSelf.lock();
+                if (self) {
+                    this->handleGetPartitions(topicName, result, lookupDataResult, currentNumPartitions);
+                }
+            });
     }
 }
 void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result,
@@ -831,9 +887,19 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
     ConsumerSubResultPromisePtr topicSubResultPromise,
     std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
     ConsumerConfiguration config = conf_.clone();
-    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
-                                        std::placeholders::_1, std::placeholders::_2));
+    auto client = client_.lock();
+    if (!client) {
+        topicSubResultPromise->setFailed(ResultAlreadyClosed);
+        return;
+    }
+    ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
+    auto weakSelf = weak_from_this();
+    config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+        auto self = weakSelf.lock();
+        if (self) {
+            messageReceived(consumer, msg);
+        }
+    });
 
     // Apply total limit of receiver queue size across partitions
     config.setReceiverQueueSize(
@@ -842,12 +908,18 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
 
     std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex);
 
-    auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+    auto consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
                                                    topicName->isPersistent(), internalListenerExecutor, true,
                                                    Partitioned);
     consumer->getConsumerCreatedFuture().addListener(
-        std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
-                  std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
+        [this, weakSelf, partitionsNeedCreate, topicSubResultPromise](
+            Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) {
+            auto self = weakSelf.lock();
+            if (self) {
+                handleSingleConsumerCreated(result, consumerImplBaseWeakPtr, partitionsNeedCreate,
+                                            topicSubResultPromise);
+            }
+        });
     consumer->setPartitionIndex(partitionIndex);
     consumer->start();
     consumers_.emplace(topicPartitionName, consumer);
@@ -873,9 +945,13 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
         messageProcessed(peekMsg);
         messages->add(peekMsg);
     }
-    auto self = get_shared_this_ptr();
-    listenerExecutor_->postWork(
-        [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+    auto weakSelf = weak_from_this();
+    listenerExecutor_->postWork([weakSelf, callback, messages]() {
+        auto self = weakSelf.lock();
+        if (self) {
+            callback(ResultOk, messages->getMessageList());
+        }
+    });
 }
 
 void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
@@ -886,3 +962,14 @@ void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
 std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
     return std::dynamic_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
 }
+
+void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
+    throw std::runtime_error("The connection_ field should not be modified for a MultiTopicsConsumerImpl");
+}
+
+void MultiTopicsConsumerImpl::cancelTimers() noexcept {
+    if (partitionsUpdateTimer_) {
+        boost::system::error_code ec;
+        partitionsUpdateTimer_->cancel(ec);
+    }
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 044f417..7c83da9 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -87,7 +87,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     Future<Result, Consumer> subscribeOneTopicAsync(const std::string& topic);
 
    protected:
-    const ClientImplPtr client_;
+    const ClientImplWeakPtr client_;
     const std::string subscriptionName_;
     std::string consumerStr_;
     const ConsumerConfiguration conf_;
@@ -118,7 +118,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     void internalListener(Consumer consumer);
     void receiveMessages();
     void failPendingReceiveCallback();
-    void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
+    void notifyPendingReceivedCallback(Result result, const Message& message,
+                                       const ReceiveCallback& callback);
 
     void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
                                   std::shared_ptr<std::atomic<int>> topicsNeedCreate);
@@ -142,10 +143,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     // impl consumer base virtual method
     bool hasEnoughMessagesForBatchReceive() const override;
     void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
+    void beforeConnectionChange(ClientConnection& cnx) override;
 
    private:
     std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
+    void cancelTimers() noexcept;
+
+    std::weak_ptr<MultiTopicsConsumerImpl> weak_from_this() noexcept {
+        return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
+    }
 
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 469ecc9..3d383ff 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -46,12 +46,12 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
                  (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
     conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
 
-    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval());
     if (partitionsUpdateInterval > 0) {
-        listenerExecutor_ = client_->getListenerExecutorProvider()->get();
+        listenerExecutor_ = client->getListenerExecutorProvider()->get();
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
         partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
-        lookupServicePtr_ = client_->getLookup();
+        lookupServicePtr_ = client->getLookup();
     }
 }
 
@@ -71,7 +71,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
     }
 }
 
-PartitionedProducerImpl::~PartitionedProducerImpl() {}
+PartitionedProducerImpl::~PartitionedProducerImpl() { shutdown(); }
 // override
 const std::string& PartitionedProducerImpl::getTopic() const { return topic_; }
 
@@ -86,7 +86,11 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
 
 ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
     using namespace std::placeholders;
-    auto producer = std::make_shared<ProducerImpl>(client_, *topicName_, conf_, partition);
+    auto client = client_.lock();
+    auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, partition);
+    if (!client) {
+        return producer;
+    }
 
     if (lazy) {
         createLazyPartitionProducer(partition);
@@ -211,7 +215,15 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
 }
 
 // override
-void PartitionedProducerImpl::shutdown() { state_ = Closed; }
+void PartitionedProducerImpl::shutdown() {
+    cancelTimers();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupProducer(this);
+    }
+    partitionedProducerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    state_ = Closed;
+}
 
 const std::string& PartitionedProducerImpl::getProducerName() const {
     Lock producersLock(producersMutex_);
@@ -239,11 +251,25 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
  * if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to
  * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
  */
-void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
-    if (state_ == Closing || state_ == Closed) {
+void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
+    auto closeCallback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            shutdown();
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+    if (state_ == Closed) {
+        closeCallback(ResultAlreadyClosed);
+        return;
+    }
+    State expectedState = Ready;
+    if (!state_.compare_exchange_strong(expectedState, Closing)) {
         return;
     }
-    state_ = Closing;
+
+    cancelTimers();
 
     unsigned int producerAlreadyClosed = 0;
 
@@ -271,12 +297,12 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
      * c. If closeAsync called due to failure in creating just one sub producer then state is set by
      * handleSinglePartitionProducerCreated
      */
-    if (producerAlreadyClosed == numProducers && closeCallback) {
-        state_ = Closed;
+    if (producerAlreadyClosed == numProducers) {
         closeCallback(ResultOk);
     }
 }
 
+// `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()`
 void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
                                                                  const unsigned int partitionIndex,
                                                                  CloseCallback callback) {
@@ -285,11 +311,9 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
         return;
     }
     if (result != ResultOk) {
-        state_ = Failed;
         LOG_ERROR("Closing the producer failed for partition - " << partitionIndex);
-        if (callback) {
-            callback(result);
-        }
+        callback(result);
+        state_ = Failed;
         return;
     }
     assert(partitionIndex < getNumPartitionsWithLock());
@@ -298,16 +322,13 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
     }
     // closed all successfully
     if (!numProducersCreated_) {
-        state_ = Closed;
         // set the producerCreatedPromise to failure, if client called
         // closeAsync and it's not failure to create producer, the promise
         // is set second time here, first time it was successful. So check
         // if there's any adverse effect of setting it again. It should not
         // be but must check. MUSTCHECK changeme
         partitionedProducerCreatedPromise_.setFailed(ResultUnknownError);
-        if (callback) {
-            callback(result);
-        }
+        callback(result);
         return;
     }
 }
@@ -371,15 +392,26 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
 }
 
 void PartitionedProducerImpl::runPartitionUpdateTask() {
+    auto weakSelf = weak_from_this();
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    partitionsUpdateTimer_->async_wait(
-        std::bind(&PartitionedProducerImpl::getPartitionMetadata, shared_from_this()));
+    partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+        auto self = weakSelf.lock();
+        if (self) {
+            self->getPartitionMetadata();
+        }
+    });
 }
 
 void PartitionedProducerImpl::getPartitionMetadata() {
     using namespace std::placeholders;
+    auto weakSelf = weak_from_this();
     lookupServicePtr_->getPartitionMetadataAsync(topicName_)
-        .addListener(std::bind(&PartitionedProducerImpl::handleGetPartitions, shared_from_this(), _1, _2));
+        .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) {
+            auto self = weakSelf.lock();
+            if (self) {
+                self->handleGetPartitions(result, lookupDataResult);
+            }
+        });
 }
 
 void PartitionedProducerImpl::handleGetPartitions(Result result,
@@ -446,4 +478,11 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
     return numberOfConnectedProducer;
 }
 
+void PartitionedProducerImpl::cancelTimers() noexcept {
+    if (partitionsUpdateTimer_) {
+        boost::system::error_code ec;
+        partitionsUpdateTimer_->cancel(ec);
+    }
+}
+
 }  // namespace pulsar
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 0a8c10e..cc7a4e0 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -73,10 +73,12 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     void notifyResult(CloseCallback closeCallback);
 
+    std::weak_ptr<PartitionedProducerImpl> weak_from_this() noexcept { return shared_from_this(); }
+
     friend class PulsarFriend;
 
    private:
-    const ClientImplPtr client_;
+    ClientImplWeakPtr client_;
 
     const TopicNamePtr topicName_;
     const std::string topic_;
@@ -119,6 +121,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     void runPartitionUpdateTask();
     void getPartitionMetadata();
     void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
+    void cancelTimers() noexcept;
 };
 
 }  // namespace pulsar
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc
index 79ed196..8014078 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -32,7 +32,7 @@ PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr cli
                               lookupServicePtr_),
       patternString_(pattern),
       pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)),
-      autoDiscoveryTimer_(),
+      autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()),
       autoDiscoveryRunning_(false) {
     namespaceName_ = TopicName::get(pattern)->getNamespaceName();
 }
@@ -215,9 +215,7 @@ void PatternMultiTopicsConsumerImpl::start() {
 
     LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
 
-    // Init autoDiscoveryTimer task only once, wait for the timeout to happen
-    if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) {
-        autoDiscoveryTimer_ = client_->getIOExecutorProvider()->get()->createDeadlineTimer();
+    if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
         autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
         autoDiscoveryTimer_->async_wait(
             std::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, std::placeholders::_1));
@@ -225,13 +223,16 @@ void PatternMultiTopicsConsumerImpl::start() {
 }
 
 void PatternMultiTopicsConsumerImpl::shutdown() {
-    Lock lock(mutex_);
-    state_ = Closed;
-    autoDiscoveryTimer_->cancel();
-    multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    cancelTimers();
+    MultiTopicsConsumerImpl::shutdown();
 }
 
 void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+    cancelTimers();
     MultiTopicsConsumerImpl::closeAsync(callback);
-    autoDiscoveryTimer_->cancel();
+}
+
+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
+    boost::system::error_code ec;
+    autoDiscoveryTimer_->cancel(ec);
 }
diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h
index 408d68e..448f2e3 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -72,6 +72,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
     bool autoDiscoveryRunning_;
     NamespaceNamePtr namespaceName_;
 
+    void cancelTimers() noexcept;
     void resetAutoDiscoveryTimer();
     void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics);
     void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback);
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 4e91ef5..65bdf23 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -38,12 +38,13 @@ void PeriodicTask::start() {
     }
 }
 
-void PeriodicTask::stop() {
+void PeriodicTask::stop() noexcept {
     State state = Ready;
     if (!state_.compare_exchange_strong(state, Closing)) {
         return;
     }
-    timer_.cancel();
+    ErrorCode ec;
+    timer_.cancel(ec);
     state_ = Pending;
 }
 
diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h
index 57d0734..159c86a 100644
--- a/lib/PeriodicTask.h
+++ b/lib/PeriodicTask.h
@@ -55,7 +55,7 @@ class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
 
     void start();
 
-    void stop();
+    void stop() noexcept;
 
     void setCallback(CallbackType callback) noexcept { callback_ = callback; }
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 20133c5..e228c83 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -109,7 +109,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
 
 ProducerImpl::~ProducerImpl() {
     LOG_DEBUG(getName() << "~ProducerImpl");
-    cancelTimers();
+    shutdown();
     printStats();
     if (state_ == Ready || state_ == Pending) {
         LOG_WARN(getName() << "Destroyed producer which was not properly closed");
@@ -124,6 +124,10 @@ int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished
 
 const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
 
+void ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
+    connection.removeProducer(producerId_);
+}
+
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
@@ -185,7 +189,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
             msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
         }
         resendMessages(cnx);
-        connection_ = cnx;
+        setCnx(cnx);
         state_ = Ready;
         backoff_.reset();
         lock.unlock();
@@ -645,7 +649,19 @@ void ProducerImpl::printStats() {
     }
 }
 
-void ProducerImpl::closeAsync(CloseCallback callback) {
+void ProducerImpl::closeAsync(CloseCallback originalCallback) {
+    auto callback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            LOG_INFO(getName() << "Closed producer " << producerId_);
+            shutdown();
+        } else {
+            LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+
     // if the producer was never started then there is nothing to clean up
     State expectedState = NotStarted;
     if (state_.compare_exchange_strong(expectedState, Closed)) {
@@ -653,9 +669,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
         return;
     }
 
-    // Keep a reference to ensure object is kept alive
-    ProducerImplPtr ptr = shared_from_this();
-
     cancelTimers();
 
     if (semaphore_) {
@@ -669,10 +682,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // just like Java's `getAndUpdate` method on an atomic variable
     const auto state = state_.load();
     if (state != Ready && state != Pending) {
-        state_ = Closed;
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
 
         return;
     }
@@ -681,53 +691,24 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
-        state_ = Closed;
-
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
     // Detach the producer from the connection to avoid sending any other
     // message from the producer
-    connection_.reset();
+    resetCnx();
 
     ClientImplPtr client = client_.lock();
     if (!client) {
-        state_ = Closed;
-        // Client was already destroyed
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
     int requestId = client->newRequestId();
-    Future<Result, ResponseData> future =
-        cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
-    if (callback) {
-        // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
-        future.addListener(
-            std::bind(&ProducerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr));
-    }
-}
-
-void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
-    if (result == ResultOk) {
-        state_ = Closed;
-        LOG_INFO(getName() << "Closed producer " << producerId_);
-        ClientConnectionPtr cnx = getCnx().lock();
-        if (cnx) {
-            cnx->removeProducer(producerId_);
-        }
-    } else {
-        LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
-    }
-
-    if (callback) {
-        callback(result);
-    }
+    auto self = shared_from_this();
+    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
+        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
 }
 
 Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() {
@@ -868,9 +849,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer
 
 void ProducerImpl::disconnectProducer() {
     LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
-    Lock lock(mutex_);
-    connection_.reset();
-    lock.unlock();
+    resetCnx();
     scheduleReconnection(shared_from_this());
 }
 
@@ -885,16 +864,21 @@ void ProducerImpl::start() {
 }
 
 void ProducerImpl::shutdown() {
-    Lock lock(mutex_);
-    state_ = Closed;
+    resetCnx();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupProducer(this);
+    }
     cancelTimers();
     producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    state_ = Closed;
 }
 
-void ProducerImpl::cancelTimers() {
+void ProducerImpl::cancelTimers() noexcept {
     dataKeyRefreshTask_.stop();
-    batchTimer_.cancel();
-    sendTimer_.cancel();
+    boost::system::error_code ec;
+    batchTimer_.cancel(ec);
+    sendTimer_.cancel(ec);
 }
 
 bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 74eee61..0559515 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -109,6 +109,7 @@ class ProducerImpl : public HandlerBase,
     friend class BatchMessageContainer;
 
     // overrided methods from HandlerBase
+    void beforeConnectionChange(ClientConnection& connection) override;
     void connectionOpened(const ClientConnectionPtr& connection) override;
     void connectionFailed(Result result) override;
     HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
@@ -120,8 +121,6 @@ class ProducerImpl : public HandlerBase,
     void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
                               const ResponseData& responseData);
 
-    void handleClose(Result result, ResultCallback callback, ProducerImplPtr producer);
-
     void resendMessages(ClientConnectionPtr cnx);
 
     void refreshEncryptionKey(const boost::system::error_code& ec);
@@ -143,7 +142,7 @@ class ProducerImpl : public HandlerBase,
     void releaseSemaphore(uint32_t payloadSize);
     void releaseSemaphoreForSendOp(const OpSendMsg& op);
 
-    void cancelTimers();
+    void cancelTimers() noexcept;
 
     bool isValidProducerState(const SendCallback& callback) const;
     bool canAddToBatch(const Message& msg) const;
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index 831d1e8..9bed7d7 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -74,12 +74,9 @@ class SynchronizedHashMap {
 
     // clear the map and apply `f` on each removed value
     void clear(std::function<void(const K&, const V&)> f) {
-        Lock lock(mutex_);
-        auto it = data_.begin();
-        while (it != data_.end()) {
-            f(it->first, it->second);
-            auto next = data_.erase(it);
-            it = next;
+        MapType data = move();
+        for (auto&& kv : data) {
+            f(kv.first, kv.second);
         }
     }
 
@@ -131,8 +128,15 @@ class SynchronizedHashMap {
         return data_.size();
     }
 
+    MapType move() noexcept {
+        Lock lock(mutex_);
+        MapType data;
+        data_.swap(data);
+        return data;
+    }
+
    private:
-    std::unordered_map<K, V> data_;
+    MapType data_;
     // Use recursive_mutex to allow methods being called in `forEach`
     mutable MutexType mutex_;
 };
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index 216b548..aa48bdc 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -20,7 +20,6 @@
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
-#include "WaitUtils.h"
 
 #include <future>
 #include <pulsar/Client.h>
@@ -198,37 +197,34 @@ TEST(ClientTest, testReferenceCount) {
         Producer producer;
         ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
         ASSERT_EQ(producers.size(), 1);
-        ASSERT_TRUE(producers[0].use_count() > 0);
-        LOG_INFO("Reference count of the producer: " << producers[0].use_count());
+
+        producers.forEachValue([](const ProducerImplBaseWeakPtr &weakProducer) {
+            LOG_INFO("Reference count of producer: " << weakProducer.use_count());
+            ASSERT_FALSE(weakProducer.expired());
+        });
 
         Consumer consumer;
         ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
         ASSERT_EQ(consumers.size(), 1);
-        ASSERT_TRUE(consumers[0].use_count() > 0);
-        LOG_INFO("Reference count of the consumer: " << consumers[0].use_count());
 
         ReaderConfiguration readerConf;
         Reader reader;
         ASSERT_EQ(ResultOk,
                   client.createReader(topic + "-reader", MessageId::earliest(), readerConf, reader));
         ASSERT_EQ(consumers.size(), 2);
-        ASSERT_TRUE(consumers[1].use_count() > 0);
-        LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
+
+        consumers.forEachValue([](const ConsumerImplBaseWeakPtr &weakConsumer) {
+            LOG_INFO("Reference count of consumer: " << weakConsumer.use_count());
+            ASSERT_FALSE(weakConsumer.expired());
+        });
 
         readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
         ASSERT_TRUE(readerWeakPtr.use_count() > 0);
         LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
     }
 
-    ASSERT_EQ(producers.size(), 1);
-    ASSERT_EQ(producers[0].use_count(), 0);
-    ASSERT_EQ(consumers.size(), 2);
-
-    waitUntil(std::chrono::seconds(1), [&consumers, &readerWeakPtr] {
-        return consumers[0].use_count() == 0 && consumers[1].use_count() == 0 && readerWeakPtr.expired();
-    });
-    EXPECT_EQ(consumers[0].use_count(), 0);
-    EXPECT_EQ(consumers[1].use_count(), 0);
+    EXPECT_EQ(producers.size(), 0);
+    EXPECT_EQ(consumers.size(), 0);
     EXPECT_EQ(readerWeakPtr.use_count(), 0);
     client.close();
 }
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index d9f9923..df8e3dc 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -98,14 +98,45 @@ class PulsarFriend {
 
     static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
 
-    static ClientImpl::ProducersList& getProducers(const Client& client) {
+    static auto getProducers(const Client& client) -> decltype(ClientImpl::producers_)& {
         return getClientImplPtr(client)->producers_;
     }
 
-    static ClientImpl::ConsumersList& getConsumers(const Client& client) {
+    static auto getConsumers(const Client& client) -> decltype(ClientImpl::consumers_)& {
         return getClientImplPtr(client)->consumers_;
     }
 
+    static std::vector<ClientConnectionPtr> getConnections(const Client& client) {
+        auto& pool = client.impl_->pool_;
+        std::vector<ClientConnectionPtr> connections;
+        std::lock_guard<std::mutex> lock(pool.mutex_);
+        for (const auto& kv : pool.pool_) {
+            auto cnx = kv.second.lock();
+            if (cnx) {
+                connections.emplace_back(cnx);
+            }
+        }
+        return connections;
+    }
+
+    static std::vector<ProducerImplPtr> getProducers(const ClientConnection& cnx) {
+        std::vector<ProducerImplPtr> producers;
+        std::lock_guard<std::mutex> lock(cnx.mutex_);
+        for (const auto& kv : cnx.producers_) {
+            producers.emplace_back(kv.second.lock());
+        }
+        return producers;
+    }
+
+    static std::vector<ConsumerImplPtr> getConsumers(const ClientConnection& cnx) {
+        std::vector<ConsumerImplPtr> consumers;
+        std::lock_guard<std::mutex> lock(cnx.mutex_);
+        for (const auto& kv : cnx.consumers_) {
+            consumers.emplace_back(kv.second.lock());
+        }
+        return consumers;
+    }
+
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
         consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
     }
diff --git a/tests/ShutdownTest.cc b/tests/ShutdownTest.cc
new file mode 100644
index 0000000..e32a95c
--- /dev/null
+++ b/tests/ShutdownTest.cc
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <atomic>
+#include <ctime>
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include "lib/ClientImpl.h"
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+enum class EndToEndType : uint8_t
+{
+    SINGLE_TOPIC,
+    MULTI_TOPICS,
+    REGEX_TOPICS
+};
+
+class ShutdownTest : public ::testing::TestWithParam<EndToEndType> {
+   public:
+    void SetUp() override {
+        topic_ = topic_ + std::to_string(id_++) + "-" + std::to_string(time(nullptr));
+        if (GetParam() != EndToEndType::SINGLE_TOPIC) {
+            int res = makePutRequest(
+                "http://localhost:8080/admin/v2/persistent/public/default/" + topic_ + "/partitions", "2");
+            ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+        }
+    }
+
+   protected:
+    Client client_{lookupUrl};
+    decltype(PulsarFriend::getProducers(client_)) producers_{PulsarFriend::getProducers(client_)};
+    decltype(PulsarFriend::getConsumers(client_)) consumers_{PulsarFriend::getConsumers(client_)};
+    std::string topic_ = "shutdown-test-";
+
+    static std::atomic_int id_;
+
+    Result subscribe(Consumer &consumer) {
+        if (GetParam() == EndToEndType::REGEX_TOPICS) {
+            // NOTE: Currently the regex subscription requires the complete namespace prefix
+            return client_.subscribeWithRegex("persistent://public/default/" + topic_ + ".*", "sub",
+                                              consumer);
+        } else {
+            return client_.subscribe(topic_, "sub", consumer);
+        }
+    }
+
+    void assertConnectionsEmpty() {
+        auto connections = PulsarFriend::getConnections(client_);
+        for (const auto &cnx : PulsarFriend::getConnections(client_)) {
+            EXPECT_TRUE(PulsarFriend::getProducers(*cnx).empty());
+            EXPECT_TRUE(PulsarFriend::getConsumers(*cnx).empty());
+        }
+    }
+};
+
+std::atomic_int ShutdownTest::id_{0};
+
+TEST_P(ShutdownTest, testClose) {
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+    EXPECT_EQ(producers_.size(), 1);
+    ASSERT_EQ(ResultOk, producer.close());
+    EXPECT_EQ(producers_.size(), 0);
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, subscribe(consumer));
+    EXPECT_EQ(consumers_.size(), 1);
+    ASSERT_EQ(ResultOk, consumer.close());
+    EXPECT_EQ(consumers_.size(), 0);
+
+    ASSERT_EQ(ResultOk, subscribe(consumer));
+    EXPECT_EQ(consumers_.size(), 1);
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    EXPECT_EQ(consumers_.size(), 0);
+
+    assertConnectionsEmpty();
+    ASSERT_EQ(ResultOk, client_.close());
+}
+
+TEST_P(ShutdownTest, testDestructor) {
+    {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+        EXPECT_EQ(producers_.size(), 1);
+    }
+    EXPECT_EQ(producers_.size(), 0);
+
+    {
+        Consumer consumer;
+        ASSERT_EQ(ResultOk, subscribe(consumer));
+        EXPECT_EQ(consumers_.size(), 1);
+    }
+    EXPECT_EQ(consumers_.size(), 0);
+
+    assertConnectionsEmpty();
+    client_.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(Pulsar, ShutdownTest,
+                         ::testing::Values(EndToEndType::SINGLE_TOPIC, EndToEndType::MULTI_TOPICS,
+                                           EndToEndType::REGEX_TOPICS));
diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h
deleted file mode 100644
index abe3efc..0000000
--- a/tests/WaitUtils.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#pragma once
-
-#include <chrono>
-#include <functional>
-#include <thread>
-
-namespace pulsar {
-
-template <typename Rep, typename Period>
-inline void waitUntil(std::chrono::duration<Rep, Period> timeout, std::function<bool()> condition) {
-    auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
-    while (timeoutMs > 0) {
-        auto now = std::chrono::high_resolution_clock::now();
-        if (condition()) {
-            break;
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds(10));
-        auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
-                           std::chrono::high_resolution_clock::now() - now)
-                           .count();
-        timeoutMs -= elapsed;
-    }
-}
-
-}  // namespace pulsar