You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/21 04:14:53 UTC

[pulsar-client-cpp] branch branch-3.0 updated (6fdcf21 -> f70aa89)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


    from 6fdcf21  Fixed validation of cluster/namespace names containing legal non-alpha characters (#52)
     new a671640  Use parametrized job to build packages (#36)
     new d7ccefd  Updated Github actions versions to avoid deprecations (#49)
     new 99d5dc1  [feat] Consumer support batch receive messages. (#21)
     new 361a621  [fix] Fix memory leak caused by incorrect close and destruction (#54)
     new b617a7f  [flaky-test] Fix very flaky tests for TEST_P (#59)
     new f70aa89  Fixed the release artifacts package file name (#113)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-build-binary-artifacts.yaml   | 217 ++------------
 .github/workflows/ci-pr-validation.yaml            |  99 ++----
 include/pulsar/BatchReceivePolicy.h                |  90 ++++++
 include/pulsar/Consumer.h                          |  25 ++
 include/pulsar/ConsumerConfiguration.h             |  18 ++
 lib/BatchReceivePolicy.cc                          |  57 ++++
 .../crc32c_sw.h => BatchReceivePolicyImpl.h}       |   8 +-
 lib/ClientConnection.h                             |   2 +-
 lib/ClientImpl.cc                                  |  89 +++---
 lib/ClientImpl.h                                   |  18 +-
 lib/ConnectionPool.h                               |   4 +-
 lib/Consumer.cc                                    |  18 ++
 lib/ConsumerConfiguration.cc                       |   9 +
 lib/ConsumerConfigurationImpl.h                    |   1 +
 lib/ConsumerImpl.cc                                | 283 +++++++++---------
 lib/ConsumerImpl.h                                 |  20 +-
 lib/ConsumerImplBase.cc                            | 141 +++++++++
 lib/ConsumerImplBase.h                             |  45 ++-
 lib/HandlerBase.cc                                 |  24 +-
 lib/HandlerBase.h                                  |  21 +-
 lib/MessagesImpl.cc                                |  58 ++++
 .../pulsar/MessageBatch.h => lib/MessagesImpl.h    |  37 ++-
 lib/MultiTopicsConsumerImpl.cc                     | 331 +++++++++++++++------
 lib/MultiTopicsConsumerImpl.h                      |  33 +-
 lib/PartitionedProducerImpl.cc                     |  83 ++++--
 lib/PartitionedProducerImpl.h                      |   5 +-
 lib/PatternMultiTopicsConsumerImpl.cc              |  19 +-
 lib/PatternMultiTopicsConsumerImpl.h               |   1 +
 lib/PeriodicTask.cc                                |   5 +-
 lib/PeriodicTask.h                                 |   2 +-
 lib/ProducerImpl.cc                                |  90 +++---
 lib/ProducerImpl.h                                 |   5 +-
 lib/SynchronizedHashMap.h                          |  18 +-
 tests/BasicEndToEndTest.cc                         | 181 +++++++++++
 ...odecSnappyTest.cc => BatchReceivePolicyTest.cc} |  30 +-
 tests/ClientTest.cc                                |  28 +-
 tests/ConsumerConfigurationTest.cc                 |   8 +
 tests/MessagesImplTest.cc                          |  61 ++++
 tests/ProducerTest.cc                              |   6 +-
 tests/PulsarFriend.h                               |  35 ++-
 tests/ShutdownTest.cc                              | 129 ++++++++
 tests/WaitUtils.h                                  |  43 ---
 42 files changed, 1611 insertions(+), 786 deletions(-)
 create mode 100644 include/pulsar/BatchReceivePolicy.h
 create mode 100644 lib/BatchReceivePolicy.cc
 copy lib/{checksum/crc32c_sw.h => BatchReceivePolicyImpl.h} (89%)
 create mode 100644 lib/ConsumerImplBase.cc
 create mode 100644 lib/MessagesImpl.cc
 copy include/pulsar/MessageBatch.h => lib/MessagesImpl.h (61%)
 copy tests/{CompressionCodecSnappyTest.cc => BatchReceivePolicyTest.cc} (55%)
 create mode 100644 tests/MessagesImplTest.cc
 create mode 100644 tests/ShutdownTest.cc
 delete mode 100644 tests/WaitUtils.h


[pulsar-client-cpp] 02/06: Updated Github actions versions to avoid deprecations (#49)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit d7ccefd66dbfd1f837402adc381c366ed028265f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 13 11:14:01 2022 -0700

    Updated Github actions versions to avoid deprecations (#49)
---
 .github/workflows/ci-build-binary-artifacts.yaml |  4 ++--
 .github/workflows/ci-pr-validation.yaml          | 10 +++++-----
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/.github/workflows/ci-build-binary-artifacts.yaml b/.github/workflows/ci-build-binary-artifacts.yaml
index 21e13db..f2a931e 100644
--- a/.github/workflows/ci-build-binary-artifacts.yaml
+++ b/.github/workflows/ci-build-binary-artifacts.yaml
@@ -47,10 +47,10 @@ jobs:
 
     steps:
       - name: checkout
-        uses: actions/checkout@v2
+        uses: actions/checkout@v3
 
       - name: Set up QEMU
-        uses: docker/setup-qemu-action@v1
+        uses: docker/setup-qemu-action@v2
 
       - name: Package Pulsar source
         run: build-support/generate-source-archive.sh
diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml
index b4b9655..58f1485 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -36,7 +36,7 @@ jobs:
 
     steps:
       - name: checkout
-        uses: actions/checkout@v2
+        uses: actions/checkout@v3
 
       - name: Install deps
         run: |
@@ -102,10 +102,10 @@ jobs:
 
     steps:
       - name: checkout
-        uses: actions/checkout@v2
+        uses: actions/checkout@v3
 
       - name: Restore vcpkg and its artifacts.
-        uses: actions/cache@v2
+        uses: actions/cache@v3
         id: vcpkg-cache
         with:
           path: |
@@ -194,10 +194,10 @@ jobs:
 
     steps:
       - name: checkout
-        uses: actions/checkout@v2
+        uses: actions/checkout@v3
 
       - name: Set up QEMU
-        uses: docker/setup-qemu-action@v1
+        uses: docker/setup-qemu-action@v2
 
       - name: Package Pulsar source
         run: build-support/generate-source-archive.sh


[pulsar-client-cpp] 03/06: [feat] Consumer support batch receive messages. (#21)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 99d5dc1fc1c206b6be6fd700632b59550c686a96
Author: Baodi Shi <ba...@icloud.com>
AuthorDate: Tue Oct 18 10:33:41 2022 +0800

    [feat] Consumer support batch receive messages. (#21)
    
    ### Motivation
    https://github.com/apache/pulsar/issues/17140
    
    This PR has been reviewed in [pulsar repo](https://github.com/apache/pulsar/pull/17429).
    
    ### Modifications
    - Consumer support batch receives messages.
    - Abstract common implementation to `ConsumerImplBase`.
---
 include/pulsar/BatchReceivePolicy.h    |  90 ++++++++++++++++
 include/pulsar/Consumer.h              |  25 +++++
 include/pulsar/ConsumerConfiguration.h |  18 ++++
 lib/BatchReceivePolicy.cc              |  57 +++++++++++
 lib/BatchReceivePolicyImpl.h           |  29 ++++++
 lib/ClientImpl.h                       |   5 +-
 lib/Consumer.cc                        |  18 ++++
 lib/ConsumerConfiguration.cc           |   9 ++
 lib/ConsumerConfigurationImpl.h        |   1 +
 lib/ConsumerImpl.cc                    | 174 +++++++++++++++++--------------
 lib/ConsumerImpl.h                     |  15 +--
 lib/ConsumerImplBase.cc                | 141 +++++++++++++++++++++++++
 lib/ConsumerImplBase.h                 |  45 +++++++-
 lib/HandlerBase.h                      |   1 +
 lib/MessagesImpl.cc                    |  58 +++++++++++
 lib/MessagesImpl.h                     |  46 +++++++++
 lib/MultiTopicsConsumerImpl.cc         | 142 ++++++++++++++++++--------
 lib/MultiTopicsConsumerImpl.h          |  22 ++--
 tests/BasicEndToEndTest.cc             | 181 +++++++++++++++++++++++++++++++++
 tests/BatchReceivePolicyTest.cc        |  40 ++++++++
 tests/ConsumerConfigurationTest.cc     |   8 ++
 tests/MessagesImplTest.cc              |  61 +++++++++++
 22 files changed, 1037 insertions(+), 149 deletions(-)

diff --git a/include/pulsar/BatchReceivePolicy.h b/include/pulsar/BatchReceivePolicy.h
new file mode 100644
index 0000000..3c66da2
--- /dev/null
+++ b/include/pulsar/BatchReceivePolicy.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef BATCH_RECEIVE_POLICY_HPP_
+#define BATCH_RECEIVE_POLICY_HPP_
+
+#include <pulsar/defines.h>
+#include <memory>
+
+namespace pulsar {
+
+struct BatchReceivePolicyImpl;
+
+/**
+ * Configuration for message batch receive {@link Consumer#batchReceive()} {@link
+ * Consumer#batchReceiveAsync()}.
+ *
+ * <p>Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a
+ * timeout for waiting for enough messages for this batch.
+ *
+ * <p>A batch receive action is completed as long as any one of the
+ * conditions (the batch has enough number or size of messages, or the waiting timeout is passed) are met.
+ *
+ * <p>Examples:
+ * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it
+ * means {@link Consumer#batchReceive()} will always wait until there is enough messages.
+ * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it
+ * means {@link Consumer#batchReceive()} will wait for 100ms no matter whether there are enough messages.
+ *
+ * <p>Note:
+ * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout.
+ * Otherwise, {@link Messages} ingest {@link Message} will never end.
+ *
+ * @since 2.4.1
+ */
+class PULSAR_PUBLIC BatchReceivePolicy {
+   public:
+    /**
+     * Default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
+     */
+    BatchReceivePolicy();
+
+    /**
+     *
+     * @param maxNumMessage  Max num message, if less than 0, it means no limit.
+     * @param maxNumBytes Max num bytes, if less than 0, it means no limit.
+     * @param timeoutMs If less than 0, it means no limit.
+     */
+    BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs);
+
+    /**
+     * Get max time out ms.
+     *
+     * @return
+     */
+    long getTimeoutMs() const;
+
+    /**
+     * Get the maximum number of messages.
+     * @return
+     */
+    int getMaxNumMessages() const;
+
+    /**
+     * Get max num bytes.
+     * @return
+     */
+    long getMaxNumBytes() const;
+
+   private:
+    std::shared_ptr<BatchReceivePolicyImpl> impl_;
+};
+}  // namespace pulsar
+
+#endif /* BATCH_RECEIVE_POLICY_HPP_ */
diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h
index 6c0ab27..c7911b9 100644
--- a/include/pulsar/Consumer.h
+++ b/include/pulsar/Consumer.h
@@ -113,6 +113,31 @@ class PULSAR_PUBLIC Consumer {
      */
     void receiveAsync(ReceiveCallback callback);
 
+    /**
+     * Batch receiving messages.
+     *
+     * <p>This calls blocks until has enough messages or wait timeout, more details to see {@link
+     * BatchReceivePolicy}.
+     *
+     * @param msgs a non-const reference where the received messages will be copied
+     * @return ResultOk when a message is received
+     * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+     */
+    Result batchReceive(Messages& msgs);
+
+    /**
+     * Async Batch receiving messages.
+     * <p>
+     * Retrieves a message when it will be available and completes callback with received message.
+     * </p>
+     * <p>
+     * batchReceiveAsync() should be called subsequently once callback gets completed with received message.
+     * Else it creates <i> backlog of receive requests </i> in the application.
+     * </p>
+     * @param BatchReceiveCallback will be completed when messages are available.
+     */
+    void batchReceiveAsync(BatchReceiveCallback callback);
+
     /**
      * Acknowledge the reception of a single message.
      *
diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h
index 4347c3b..13d5cc0 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -31,6 +31,7 @@
 #include <pulsar/InitialPosition.h>
 #include <pulsar/KeySharedPolicy.h>
 #include <pulsar/ConsumerEventListener.h>
+#include "BatchReceivePolicy.h"
 
 namespace pulsar {
 
@@ -38,8 +39,10 @@ class Consumer;
 class PulsarWrapper;
 
 /// Callback definition for non-data operation
+typedef std::vector<Message> Messages;
 typedef std::function<void(Result result)> ResultCallback;
 typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
+typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
 typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
 
 /// Callback definition for MessageListener
@@ -378,6 +381,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     InitialPosition getSubscriptionInitialPosition() const;
 
+    /**
+     * Set batch receive policy.
+     *
+     * @param batchReceivePolicy the default is
+     * {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
+     */
+    void setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy);
+
+    /**
+     * Get batch receive policy.
+     *
+     * @return batch receive policy
+     */
+    const BatchReceivePolicy& getBatchReceivePolicy() const;
+
     /**
      * Set whether the subscription status should be replicated.
      * The default value is `false`.
diff --git a/lib/BatchReceivePolicy.cc b/lib/BatchReceivePolicy.cc
new file mode 100644
index 0000000..08aa368
--- /dev/null
+++ b/lib/BatchReceivePolicy.cc
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/BatchReceivePolicy.h>
+#include "BatchReceivePolicyImpl.h"
+#include "LogUtils.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+DECLARE_LOG_OBJECT()
+
+BatchReceivePolicy::BatchReceivePolicy() : BatchReceivePolicy(-1, 10 * 1024 * 1024, 100) {}
+
+BatchReceivePolicy::BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs)
+    : impl_(std::make_shared<BatchReceivePolicyImpl>()) {
+    if (maxNumMessage <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) {
+        throw std::invalid_argument(
+            "At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.");
+    }
+    if (maxNumMessage <= 0 && maxNumBytes <= 0) {
+        impl_->maxNumMessage = -1;
+        impl_->maxNumBytes = 10 * 1024 * 1024;
+        LOG_WARN(
+            "BatchReceivePolicy maxNumMessages and maxNumBytes is less than 0. Reset to default: "
+            "maxNumMessage(-1), maxNumBytes(10 * 1024 * 10)");
+    } else {
+        impl_->maxNumMessage = maxNumMessage;
+        impl_->maxNumBytes = maxNumBytes;
+    }
+    impl_->timeoutMs = timeoutMs;
+}
+
+long BatchReceivePolicy::getTimeoutMs() const { return impl_->timeoutMs; }
+
+int BatchReceivePolicy::getMaxNumMessages() const { return impl_->maxNumMessage; }
+
+long BatchReceivePolicy::getMaxNumBytes() const { return impl_->maxNumBytes; }
+
+}  // namespace pulsar
diff --git a/lib/BatchReceivePolicyImpl.h b/lib/BatchReceivePolicyImpl.h
new file mode 100644
index 0000000..e7ba431
--- /dev/null
+++ b/lib/BatchReceivePolicyImpl.h
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+namespace pulsar {
+
+struct BatchReceivePolicyImpl {
+    int maxNumMessage;
+    long maxNumBytes;
+    long timeoutMs;
+};
+
+}  // namespace pulsar
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 466461a..e8e7708 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -28,14 +28,12 @@
 #include <mutex>
 #include <lib/TopicName.h>
 #include "ProducerImplBase.h"
-#include "ConsumerImplBase.h"
 #include <atomic>
 #include <vector>
 #include "ServiceNameResolver.h"
 
 namespace pulsar {
 
-class ClientImpl;
 class PulsarFriend;
 typedef std::shared_ptr<ClientImpl> ClientImplPtr;
 typedef std::weak_ptr<ClientImpl> ClientImplWeakPtr;
@@ -44,6 +42,9 @@ class ReaderImpl;
 typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
 typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
 
+class ConsumerImplBase;
+typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
+
 std::string generateRandomName();
 
 class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
diff --git a/lib/Consumer.cc b/lib/Consumer.cc
index 5d16362..13fb9f4 100644
--- a/lib/Consumer.cc
+++ b/lib/Consumer.cc
@@ -82,6 +82,24 @@ void Consumer::receiveAsync(ReceiveCallback callback) {
     impl_->receiveAsync(callback);
 }
 
+Result Consumer::batchReceive(Messages& msgs) {
+    if (!impl_) {
+        return ResultConsumerNotInitialized;
+    }
+    Promise<Result, Messages> promise;
+    impl_->batchReceiveAsync(WaitForCallbackValue<Messages>(promise));
+    return promise.getFuture().get(msgs);
+}
+
+void Consumer::batchReceiveAsync(BatchReceiveCallback callback) {
+    if (!impl_) {
+        Messages msgs;
+        callback(ResultConsumerNotInitialized, msgs);
+        return;
+    }
+    impl_->batchReceiveAsync(callback);
+}
+
 Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); }
 
 Result Consumer::acknowledge(const MessageId& messageId) {
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index f9fe499..0705cca 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -19,6 +19,7 @@
 #include <lib/ConsumerConfigurationImpl.h>
 
 #include <stdexcept>
+#include <pulsar/ConsumerConfiguration.h>
 
 namespace pulsar {
 
@@ -267,4 +268,12 @@ ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool st
 
 bool ConsumerConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; }
 
+void ConsumerConfiguration::setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy) {
+    impl_->batchReceivePolicy = batchReceivePolicy;
+}
+
+const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
+    return impl_->batchReceivePolicy;
+}
+
 }  // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index cca83a3..444fedf 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -45,6 +45,7 @@ struct ConsumerConfigurationImpl {
     ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
     bool readCompacted{false};
     InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
+    BatchReceivePolicy batchReceivePolicy{};
     int patternAutoDiscoveryPeriod{60};
     bool replicateSubscriptionStateEnabled{false};
     std::map<std::string, std::string> properties;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 37fcd95..54e346f 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -18,6 +18,7 @@
  */
 #include "ConsumerImpl.h"
 #include "MessageImpl.h"
+#include "MessagesImpl.h"
 #include "Commands.h"
 #include "LogUtils.h"
 #include "TimeUtils.h"
@@ -43,7 +44,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
                            bool hasParent /* = false by default */,
                            const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
                            Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
-    : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
+    : ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
+                       listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
       waitingForZeroQueueSizeMessage(false),
       config_(conf),
       subscription_(subscriptionName),
@@ -85,13 +87,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
 
-    // Initialize listener executor.
-    if (listenerExecutor) {
-        listenerExecutor_ = listenerExecutor;
-    } else {
-        listenerExecutor_ = client->getListenerExecutorProvider()->get();
-    }
-
     // Setup stats reporter.
     unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
     if (statsIntervalInSeconds) {
@@ -145,12 +140,12 @@ const std::string& ConsumerImpl::getTopic() const { return topic_; }
 void ConsumerImpl::start() {
     HandlerBase::start();
 
-    // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
+    // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
     // constructor completed.
     if (TopicName::get(topic_)->isPersistent()) {
         if (config_.getAckGroupingTimeMs() > 0) {
             ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
-                client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
+                client_.lock(), get_shared_this_ptr(), consumerId_, config_.getAckGroupingTimeMs(),
                 config_.getAckGroupingMaxSize()));
         } else {
             ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
@@ -169,7 +164,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
 
     // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
     // sending the subscribe request.
-    cnx->registerConsumer(consumerId_, shared_from_this());
+    cnx->registerConsumer(consumerId_, get_shared_this_ptr());
 
     if (duringSeek_) {
         ackGroupingTrackerPtr_->flushAndClean();
@@ -195,13 +190,13 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
         config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
         config_.getKeySharedPolicy(), config_.getPriorityLevel());
     cnx->sendRequestWithId(cmd, requestId)
-        .addListener(
-            std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
+        .addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx,
+                               std::placeholders::_1));
 }
 
 void ConsumerImpl::connectionFailed(Result result) {
     // Keep a reference to ensure object is kept alive
-    ConsumerImplPtr ptr = shared_from_this();
+    auto ptr = get_shared_this_ptr();
 
     if (consumerCreatedPromise_.setFailed(result)) {
         state_ = Failed;
@@ -244,7 +239,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
                 sendFlowPermitsToBroker(cnx, 1);
             }
         }
-        consumerCreatedPromise_.setValue(shared_from_this());
+        consumerCreatedPromise_.setValue(get_shared_this_ptr());
     } else {
         if (result == ResultTimeout) {
             // Creating the consumer has timed out. We need to ensure the broker closes the consumer
@@ -257,12 +252,12 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
         if (consumerCreatedPromise_.isComplete()) {
             // Consumer had already been initially created, we need to retry connecting in any case
             LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
-            scheduleReconnection(shared_from_this());
+            scheduleReconnection(get_shared_this_ptr());
         } else {
             // Consumer was not yet created, retry to connect to broker if it's possible
             if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
                 LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result));
-                scheduleReconnection(shared_from_this());
+                scheduleReconnection(get_shared_this_ptr());
             } else {
                 LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
                 consumerCreatedPromise_.setFailed(result);
@@ -292,7 +287,7 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
         int requestId = client->newRequestId();
         SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
         cnx->sendRequestWithId(cmd, requestId)
-            .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, shared_from_this(),
+            .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, get_shared_this_ptr(),
                                    std::placeholders::_1, callback));
     } else {
         Result result = ResultNotConnected;
@@ -460,34 +455,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
                                 << startMessageId.value());
             return;
         }
-
-        Lock lock(pendingReceiveMutex_);
-        // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
-        bool asyncReceivedWaiting = !pendingReceives_.empty();
-        ReceiveCallback callback;
-        if (asyncReceivedWaiting) {
-            callback = pendingReceives_.front();
-            pendingReceives_.pop();
-        }
-        lock.unlock();
-
-        if (asyncReceivedWaiting) {
-            listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                                  shared_from_this(), ResultOk, m, callback));
-            return;
-        }
-
-        // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
-        if (config_.getReceiverQueueSize() != 0 ||
-            (config_.getReceiverQueueSize() == 0 && messageListener_)) {
-            incomingMessages_.push(m);
-        } else {
-            Lock lock(mutex_);
-            if (waitingForZeroQueueSizeMessage) {
-                lock.unlock();
-                incomingMessages_.push(m);
-            }
-        }
+        executeNotifyCallback(m);
     }
 
     if (messageListener_) {
@@ -496,7 +464,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         }
         // Trigger message listener callback in a separate thread
         while (numOfMessageReceived--) {
-            listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
+            listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
         }
     }
 }
@@ -504,16 +472,16 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
 void ConsumerImpl::activeConsumerChanged(bool isActive) {
     if (eventListener_) {
         listenerExecutor_->postWork(
-            std::bind(&ConsumerImpl::internalConsumerChangeListener, shared_from_this(), isActive));
+            std::bind(&ConsumerImpl::internalConsumerChangeListener, get_shared_this_ptr(), isActive));
     }
 }
 
 void ConsumerImpl::internalConsumerChangeListener(bool isActive) {
     try {
         if (isActive) {
-            eventListener_->becameActive(Consumer(shared_from_this()), partitionIndex_);
+            eventListener_->becameActive(Consumer(get_shared_this_ptr()), partitionIndex_);
         } else {
-            eventListener_->becameInactive(Consumer(shared_from_this()), partitionIndex_);
+            eventListener_->becameInactive(Consumer(get_shared_this_ptr()), partitionIndex_);
         }
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from event listener " << e.what());
@@ -527,11 +495,56 @@ void ConsumerImpl::failPendingReceiveCallback() {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
+                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
 
+void ConsumerImpl::executeNotifyCallback(Message& msg) {
+    Lock lock(pendingReceiveMutex_);
+    // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+    bool asyncReceivedWaiting = !pendingReceives_.empty();
+    ReceiveCallback callback;
+    if (asyncReceivedWaiting) {
+        callback = pendingReceives_.front();
+        pendingReceives_.pop();
+    }
+    lock.unlock();
+
+    // has pending receive, direct callback.
+    if (asyncReceivedWaiting) {
+        listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        return;
+    }
+
+    // try to add incoming messages.
+    // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
+    if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) {
+        incomingMessages_.push(msg);
+        incomingMessagesSize_.fetch_add(msg.getLength());
+    }
+
+    // try trigger pending batch messages
+    Lock batchOptionLock(batchReceiveOptionMutex_);
+    if (hasEnoughMessagesForBatchReceive()) {
+        ConsumerImplBase::notifyBatchPendingReceivedCallback();
+    }
+}
+
+void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+    auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+                                                   batchReceivePolicy_.getMaxNumBytes());
+    Message peekMsg;
+    while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
+        messageProcessed(peekMsg);
+        messages->add(peekMsg);
+    }
+    auto self = get_shared_this_ptr();
+    listenerExecutor_->postWork(
+        [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+}
+
 void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
                                                  const ReceiveCallback& callback) {
     if (result == ResultOk && config_.getReceiverQueueSize() != 0) {
@@ -573,19 +586,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
             }
         }
 
-        //
-        Lock lock(pendingReceiveMutex_);
-        if (!pendingReceives_.empty()) {
-            ReceiveCallback callback = pendingReceives_.front();
-            pendingReceives_.pop();
-            lock.unlock();
-            listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                                  shared_from_this(), ResultOk, msg, callback));
-        } else {
-            // Regular path, append individual message to incoming messages queue
-            incomingMessages_.push(msg);
-            lock.unlock();
-        }
+        executeNotifyCallback(msg);
     }
 
     if (skippedMessages > 0) {
@@ -698,7 +699,7 @@ void ConsumerImpl::internalListener() {
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
         lastDequedMessageId_ = msg.getMessageId();
-        messageListener_(Consumer(shared_from_this()), msg);
+        messageListener_(Consumer(get_shared_this_ptr()), msg);
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
     }
@@ -721,9 +722,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
             getName() << "The incoming message queue should never be greater than 0 when Queue size is 0");
         incomingMessages_.clear();
     }
-    Lock localLock(mutex_);
     waitingForZeroQueueSizeMessage = true;
-    localLock.unlock();
 
     sendFlowPermitsToBroker(currentCnx, 1);
 
@@ -745,7 +744,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
             }
         }
     }
-    return ResultOk;
 }
 
 Result ConsumerImpl::receive(Message& msg) {
@@ -837,6 +835,8 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
     lastDequedMessageId_ = msg.getMessageId();
     lock.unlock();
 
+    incomingMessagesSize_.fetch_sub(msg.getLength());
+
     ClientConnectionPtr currentCnx = getCnx().lock();
     if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
         LOG_DEBUG(getName() << "Not adding permit since connection is different.");
@@ -934,7 +934,7 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com
 }
 
 void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
-    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1,
+    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
                                   callback, proto::CommandAck_AckType_Individual);
     if (msgId.batchIndex() != -1 &&
         !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) {
@@ -945,7 +945,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
 }
 
 void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
-    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1,
+    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
                                   callback, proto::CommandAck_AckType_Cumulative);
     if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
         cb(ResultCumulativeAcknowledgementNotAllowedError);
@@ -993,12 +993,12 @@ void ConsumerImpl::disconnectConsumer() {
     Lock lock(mutex_);
     connection_.reset();
     lock.unlock();
-    scheduleReconnection(shared_from_this());
+    scheduleReconnection(get_shared_this_ptr());
 }
 
 void ConsumerImpl::closeAsync(ResultCallback callback) {
     // Keep a reference to ensure object is kept alive
-    ConsumerImplPtr ptr = shared_from_this();
+    ConsumerImplPtr ptr = get_shared_this_ptr();
 
     if (state_ != Ready) {
         if (callback) {
@@ -1041,12 +1041,16 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
     if (callback) {
         // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
-        future.addListener(
-            std::bind(&ConsumerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr));
+        future.addListener(std::bind(&ConsumerImpl::handleClose, get_shared_this_ptr(), std::placeholders::_1,
+                                     callback, ptr));
     }
 
     // fail pendingReceive callback
     failPendingReceiveCallback();
+    failPendingBatchReceiveCallback();
+
+    // cancel timer
+    batchReceiveTimer_->cancel();
 }
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
@@ -1102,7 +1106,7 @@ Result ConsumerImpl::resumeMessageListener() {
 
     for (size_t i = 0; i < count; i++) {
         // Trigger message listener callback in a separate thread
-        listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
+        listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
     }
     // Check current permits and determine whether to send FLOW command
     this->increaseAvailablePermits(getCnx().lock(), 0);
@@ -1167,7 +1171,7 @@ void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callb
                                 << ", requestId - " << requestId);
 
             cnx->newConsumerStats(consumerId_, requestId)
-                .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(),
+                .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, get_shared_this_ptr(),
                                        std::placeholders::_1, std::placeholders::_2, callback));
             return;
         } else {
@@ -1303,7 +1307,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
             LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId()
                                 << ", requestId - " << requestId);
 
-            auto self = shared_from_this();
+            auto self = get_shared_this_ptr();
             cnx->newGetLastMessageId(consumerId_, requestId)
                 .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) {
                     if (result == ResultOk) {
@@ -1384,7 +1388,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
         LOG_INFO(getName() << " Seeking subscription to " << seekId);
     }
 
-    std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this()};
+    std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
 
     cnx->sendRequestWithId(seek, requestId)
         .addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
@@ -1419,4 +1423,18 @@ bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
                                                : idx <= startMessageId_.get().value().entryId();
 }
 
+bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const {
+    if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) {
+        return false;
+    }
+    return (batchReceivePolicy_.getMaxNumMessages() > 0 &&
+            incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) ||
+           (batchReceivePolicy_.getMaxNumBytes() > 0 &&
+            incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes());
+}
+
+std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
+    return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this());
+}
+
 } /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 1ad3a4c..09d2c5c 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -65,9 +65,7 @@ enum ConsumerTopicType
     Partitioned
 };
 
-class ConsumerImpl : public ConsumerImplBase,
-                     public HandlerBase,
-                     public std::enable_shared_from_this<ConsumerImpl> {
+class ConsumerImpl : public ConsumerImplBase {
    public:
     ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
                  const ConsumerConfiguration&, bool isPersistent,
@@ -147,7 +145,10 @@ class ConsumerImpl : public ConsumerImplBase,
     // overrided methods from HandlerBase
     void connectionOpened(const ClientConnectionPtr& cnx) override;
     void connectionFailed(Result result) override;
-    HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
+
+    // impl methods from ConsumerImpl base
+    bool hasEnoughMessagesForBatchReceive() const override;
+    void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
 
     void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
 
@@ -159,7 +160,8 @@ class ConsumerImpl : public ConsumerImplBase,
     ConsumerStatsBasePtr consumerStatsBasePtr_;
 
    private:
-    bool waitingForZeroQueueSizeMessage;
+    std::atomic_bool waitingForZeroQueueSizeMessage;
+    std::shared_ptr<ConsumerImpl> get_shared_this_ptr();
     bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageIdData,
                                    const proto::MessageMetadata& metadata, SharedBuffer& payload,
                                    bool checkMaxMessageSize);
@@ -180,6 +182,7 @@ class ConsumerImpl : public ConsumerImplBase,
     Result receiveHelper(Message& msg);
     Result receiveHelper(Message& msg, int timeout);
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
+    void executeNotifyCallback(Message& msg);
     void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
     void failPendingReceiveCallback();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
@@ -199,13 +202,13 @@ class ConsumerImpl : public ConsumerImplBase,
     const bool isPersistent_;
     MessageListener messageListener_;
     ConsumerEventListenerPtr eventListener_;
-    ExecutorServicePtr listenerExecutor_;
     bool hasParent_;
     ConsumerTopicType consumerTopicType_;
 
     const Commands::SubscriptionMode subscriptionMode_;
 
     UnboundedBlockingQueue<Message> incomingMessages_;
+    std::atomic_int incomingMessagesSize_ = {0};
     std::queue<ReceiveCallback> pendingReceives_;
     std::atomic_int availablePermits_;
     const int receiverQueueRefillThreshold_;
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
new file mode 100644
index 0000000..4a8c027
--- /dev/null
+++ b/lib/ConsumerImplBase.cc
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs;
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            Lock batchOptionLock(batchReceiveOptionMutex_);
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchOptionLock.unlock();
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        listenerExecutor_->postWork(
+            [opBatchReceive]() { opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); });
+    }
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        lock.unlock();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    Lock batchOptionLock(batchReceiveOptionMutex_);
+    if (hasEnoughMessagesForBatchReceive()) {
+        notifyBatchPendingReceivedCallback(callback);
+        batchOptionLock.unlock();
+    } else {
+        OpBatchReceive opBatchReceive(callback);
+        Lock lock(batchPendingReceiveMutex_);
+        batchPendingReceives_.emplace(opBatchReceive);
+        lock.unlock();
+        triggerBatchReceiveTimerTask(batchReceivePolicy_.getTimeoutMs());
+    }
+}
+
+OpBatchReceive::OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback)
+    : batchReceiveCallback_(batchReceiveCallback), createAt_(TimeUtils::currentTimeMillis()) {}
+
+} /* namespace pulsar */
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 693d4da..18b8bc1 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -20,23 +20,38 @@
 #define PULSAR_CONSUMER_IMPL_BASE_HEADER
 #include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
-
+#include "HandlerBase.h"
+#include <queue>
 #include <set>
 
 namespace pulsar {
 class ConsumerImplBase;
+class HandlerBase;
 
 typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
 
-class ConsumerImplBase {
+class OpBatchReceive {
    public:
-    virtual ~ConsumerImplBase() {}
+    OpBatchReceive();
+    explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback);
+    const BatchReceiveCallback batchReceiveCallback_;
+    const int64_t createAt_;
+};
+
+class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this<ConsumerImplBase> {
+   public:
+    virtual ~ConsumerImplBase(){};
+    ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                     const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor);
+
+    // interface by consumer
     virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() = 0;
-    virtual const std::string& getSubscriptionName() const = 0;
     virtual const std::string& getTopic() const = 0;
+    virtual const std::string& getSubscriptionName() const = 0;
     virtual Result receive(Message& msg) = 0;
     virtual Result receive(Message& msg, int timeout) = 0;
     virtual void receiveAsync(ReceiveCallback& callback) = 0;
+    void batchReceiveAsync(BatchReceiveCallback callback);
     virtual void unsubscribeAsync(ResultCallback callback) = 0;
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0;
     virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
@@ -49,7 +64,6 @@ class ConsumerImplBase {
     virtual Result resumeMessageListener() = 0;
     virtual void redeliverUnacknowledgedMessages() = 0;
     virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
-    virtual const std::string& getName() const = 0;
     virtual int getNumOfPrefetchedMessages() const = 0;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
@@ -57,6 +71,27 @@ class ConsumerImplBase {
     virtual void negativeAcknowledge(const MessageId& msgId) = 0;
     virtual bool isConnected() const = 0;
     virtual uint64_t getNumberOfConnectedConsumer() = 0;
+    // overrided methods from HandlerBase
+    virtual const std::string& getName() const override = 0;
+
+   protected:
+    // overrided methods from HandlerBase
+    void connectionOpened(const ClientConnectionPtr& cnx) override {}
+    void connectionFailed(Result result) override {}
+    HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
+
+    // consumer impl generic method.
+    ExecutorServicePtr listenerExecutor_;
+    std::queue<OpBatchReceive> batchPendingReceives_;
+    BatchReceivePolicy batchReceivePolicy_;
+    DeadlineTimerPtr batchReceiveTimer_;
+    std::mutex batchReceiveOptionMutex_;
+    void triggerBatchReceiveTimerTask(long timeoutMs);
+    void doBatchReceiveTimeTask();
+    void failPendingBatchReceiveCallback();
+    void notifyBatchPendingReceivedCallback();
+    virtual void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) = 0;
+    virtual bool hasEnoughMessagesForBatchReceive() const = 0;
 
    private:
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 1184746..6fc3603 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -90,6 +90,7 @@ class HandlerBase {
     ExecutorServicePtr executor_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
+    std::mutex batchPendingReceiveMutex_;
     ptime creationTimestamp_;
 
     const TimeDuration operationTimeut_;
diff --git a/lib/MessagesImpl.cc b/lib/MessagesImpl.cc
new file mode 100644
index 0000000..7d45cdd
--- /dev/null
+++ b/lib/MessagesImpl.cc
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+    : maxNumberOfMessages_(maxNumberOfMessages),
+      maxSizeOfMessages_(maxSizeOfMessages),
+      currentSizeOfMessages_(0) {}
+
+const std::vector<Message>& MessagesImpl::getMessageList() const { return messageList_; }
+
+bool MessagesImpl::canAdd(const Message& message) const {
+    if (messageList_.size() == 0) {
+        return true;
+    }
+
+    if (maxNumberOfMessages_ > 0 && messageList_.size() + 1 > maxNumberOfMessages_) {
+        return false;
+    }
+
+    if (maxSizeOfMessages_ > 0 && currentSizeOfMessages_ + message.getLength() > maxSizeOfMessages_) {
+        return false;
+    }
+
+    return true;
+}
+
+void MessagesImpl::add(const Message& message) {
+    if (!canAdd(message)) {
+        throw std::invalid_argument("No more space to add messages.");
+    }
+    currentSizeOfMessages_ += message.getLength();
+    messageList_.emplace_back(message);
+}
+
+int MessagesImpl::size() const { return messageList_.size(); }
+
+void MessagesImpl::clear() {
+    currentSizeOfMessages_ = 0;
+    messageList_.clear();
+}
diff --git a/lib/MessagesImpl.h b/lib/MessagesImpl.h
new file mode 100644
index 0000000..0c12768
--- /dev/null
+++ b/lib/MessagesImpl.h
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_MESSAGESIMPL_H
+#define PULSAR_CPP_MESSAGESIMPL_H
+
+#include <vector>
+#include <pulsar/Message.h>
+
+using namespace pulsar;
+
+namespace pulsar {
+
+class MessagesImpl {
+   public:
+    MessagesImpl(const int maxNumberOfMessages, const long maxSizeOfMessages);
+    const std::vector<Message>& getMessageList() const;
+    bool canAdd(const Message& message) const;
+    void add(const Message& message);
+    int size() const;
+    void clear();
+
+   private:
+    std::vector<Message> messageList_;
+    const int maxNumberOfMessages_;
+    const long maxSizeOfMessages_;
+    long currentSizeOfMessages_;
+};
+
+}  // namespace pulsar
+#endif  // PULSAR_CPP_MESSAGESIMPL_H
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 0d730e1..573c33d 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -18,6 +18,7 @@
  */
 #include "MultiTopicsConsumerImpl.h"
 #include "MultiResultCallback.h"
+#include "MessagesImpl.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -27,12 +28,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
                                                  const std::string& subscriptionName, TopicNamePtr topicName,
                                                  const ConsumerConfiguration& conf,
                                                  LookupServicePtr lookupServicePtr)
-    : client_(client),
+    : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
+                       Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
+                       client->getListenerExecutorProvider()->get()),
+      client_(client),
       subscriptionName_(subscriptionName),
-      topic_(topicName ? topicName->toString() : "EmptyTopics"),
       conf_(conf),
-      messages_(conf.getReceiverQueueSize()),
-      listenerExecutor_(client->getListenerExecutorProvider()->get()),
+      incomingMessages_(conf.getReceiverQueueSize()),
       messageListener_(conf.getMessageListener()),
       lookupServicePtr_(lookupServicePtr),
       numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
@@ -59,14 +61,16 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
         partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
         lookupServicePtr_ = client_->getLookup();
     }
+
+    state_ = Pending;
 }
 
 void MultiTopicsConsumerImpl::start() {
     if (topics_.empty()) {
-        MultiTopicsConsumerState state = Pending;
+        State state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
-            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+            multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
             return;
         } else {
             LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << state_);
@@ -81,7 +85,7 @@ void MultiTopicsConsumerImpl::start() {
     // subscribe for each passed in topic
     for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
         subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed,
-                                                           shared_from_this(), std::placeholders::_1,
+                                                           get_shared_this_ptr(), std::placeholders::_1,
                                                            std::placeholders::_2, *itr, topicsNeedCreate));
     }
 }
@@ -100,10 +104,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     }
 
     if (--(*topicsNeedCreate) == 0) {
-        MultiTopicsConsumerState state = Pending;
+        State state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
-            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+            multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
@@ -162,7 +166,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
     ConsumerConfiguration config = conf_.clone();
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
 
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
+    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
                                         std::placeholders::_1, std::placeholders::_2));
 
     int partitions = numPartitions == 0 ? 1 : numPartitions;
@@ -186,8 +190,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
                                                   topicName->isPersistent(), internalListenerExecutor, true,
                                                   NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(std::bind(
-            &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1,
-            std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
+            &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
+            std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
         consumers_.emplace(topicName->toString(), consumer);
         LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
         consumer->start();
@@ -199,7 +203,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
                                                       topicName->isPersistent(), internalListenerExecutor,
                                                       true, Partitioned);
             consumer->getConsumerCreatedFuture().addListener(std::bind(
-                &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
+                &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
                 std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
             consumer->setPartitionIndex(i);
             consumers_.emplace(topicPartitionName, consumer);
@@ -236,7 +240,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
         if (partitionsUpdateTimer_) {
             runPartitionUpdateTask();
         }
-        topicSubResultPromise->setValue(Consumer(shared_from_this()));
+        topicSubResultPromise->setValue(Consumer(get_shared_this_ptr()));
     }
 }
 
@@ -252,7 +256,7 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     state_ = Closing;
 
     std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
-    auto self = shared_from_this();
+    auto self = get_shared_this_ptr();
     int numConsumers = 0;
     consumers_.forEachValue(
         [&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
@@ -329,7 +333,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
         }
 
         optConsumer.value()->unsubscribeAsync(
-            std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, shared_from_this(),
+            std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, get_shared_this_ptr(),
                       std::placeholders::_1, consumerUnsubed, numberPartitions, topicName, topicPartitionName,
                       callback));
     }
@@ -385,7 +389,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     state_ = Closing;
 
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
     int numConsumers = 0;
     consumers_.clear(
         [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
@@ -414,7 +418,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
                 }
                 // closed all consumers
                 if (numConsumersLeft == 0) {
-                    messages_.clear();
+                    incomingMessages_.clear();
                     topicsPartitions_.clear();
                     unAckedMessageTrackerPtr_->clear();
 
@@ -440,6 +444,10 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     // fail pending receive
     failPendingReceiveCallback();
+    failPendingBatchReceiveCallback();
+
+    // cancel timer
+    batchReceiveTimer_->cancel();
 }
 
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
@@ -454,25 +462,39 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
         pendingReceives_.pop();
         lock.unlock();
         listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultOk, msg, callback));
-    } else {
-        if (messages_.full()) {
-            lock.unlock();
-        }
+                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        return;
+    }
 
-        if (messages_.push(msg) && messageListener_) {
-            listenerExecutor_->postWork(
-                std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
-        }
+    if (incomingMessages_.full()) {
+        lock.unlock();
+    }
+
+    // add message to block queue.
+    // when messages queue is full, will block listener thread on ConsumerImpl,
+    // then will not send permits to broker, will broker stop push message.
+    incomingMessages_.push(msg);
+    incomingMessagesSize_.fetch_add(msg.getLength());
+
+    // try trigger pending batch messages
+    Lock batchOptionLock(batchReceiveOptionMutex_);
+    if (hasEnoughMessagesForBatchReceive()) {
+        ConsumerImplBase::notifyBatchPendingReceivedCallback();
+    }
+    batchOptionLock.unlock();
+
+    if (messageListener_) {
+        listenerExecutor_->postWork(
+            std::bind(&MultiTopicsConsumerImpl::internalListener, get_shared_this_ptr(), consumer));
     }
 }
 
 void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
     Message m;
-    messages_.pop(m);
-    unAckedMessageTrackerPtr_->add(m.getMessageId());
+    incomingMessages_.pop(m);
     try {
-        messageListener_(Consumer(shared_from_this()), m);
+        messageListener_(Consumer(get_shared_this_ptr()), m);
+        messageProcessed(m);
     } catch (const std::exception& e) {
         LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
     }
@@ -487,9 +509,9 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
-    messages_.pop(msg);
+    incomingMessages_.pop(msg);
+    messageProcessed(msg);
 
-    unAckedMessageTrackerPtr_->add(msg.getMessageId());
     return ResultOk;
 }
 
@@ -503,8 +525,8 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
+        messageProcessed(msg);
         return ResultOk;
     } else {
         if (state_ != Ready) {
@@ -524,9 +546,9 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     }
 
     Lock lock(pendingReceiveMutex_);
-    if (messages_.pop(msg, std::chrono::milliseconds(0))) {
+    if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
         lock.unlock();
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        messageProcessed(msg);
         callback(ResultOk, msg);
     } else {
         pendingReceives_.push(callback);
@@ -536,14 +558,14 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     Message msg;
 
-    messages_.close();
+    incomingMessages_.close();
 
     Lock lock(pendingReceiveMutex_);
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
+                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
@@ -649,7 +671,7 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
     });
 }
 
-int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
+int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
 
 void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
     if (state_ != Ready) {
@@ -662,7 +684,7 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
     lock.unlock();
 
-    auto self = shared_from_this();
+    auto self = get_shared_this_ptr();
     size_t i = 0;
     consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
         size_t index = i++;
@@ -750,7 +772,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
 }
 void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
     partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
         // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
         // cannot continue at this time, and the request needs to be ignored.
@@ -769,7 +791,7 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
         auto topicName = TopicName::get(item.first);
         auto currentNumPartitions = item.second;
         lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
-            std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, shared_from_this(), topicName,
+            std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, get_shared_this_ptr(), topicName,
                       std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
     }
 }
@@ -810,7 +832,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
     std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
     ConsumerConfiguration config = conf_.clone();
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
+    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
                                         std::placeholders::_1, std::placeholders::_2));
 
     // Apply total limit of receiver queue size across partitions
@@ -824,7 +846,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
                                                    topicName->isPersistent(), internalListenerExecutor, true,
                                                    Partitioned);
     consumer->getConsumerCreatedFuture().addListener(
-        std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
+        std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
                   std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
     consumer->setPartitionIndex(partitionIndex);
     consumer->start();
@@ -832,3 +854,35 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
     LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
                                             << " consumerSize: " << consumers_.size());
 }
+
+bool MultiTopicsConsumerImpl::hasEnoughMessagesForBatchReceive() const {
+    if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) {
+        return false;
+    }
+    return (batchReceivePolicy_.getMaxNumMessages() > 0 &&
+            incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) ||
+           (batchReceivePolicy_.getMaxNumBytes() > 0 &&
+            incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes());
+}
+
+void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+    auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+                                                   batchReceivePolicy_.getMaxNumBytes());
+    Message peekMsg;
+    while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
+        messageProcessed(peekMsg);
+        messages->add(peekMsg);
+    }
+    auto self = get_shared_this_ptr();
+    listenerExecutor_->postWork(
+        [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+}
+
+void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
+    incomingMessagesSize_.fetch_sub(msg.getLength());
+    unAckedMessageTrackerPtr_->add(msg.getMessageId());
+}
+
+std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
+    return std::dynamic_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 8769d59..044f417 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -38,17 +38,8 @@ namespace pulsar {
 typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
 
 class MultiTopicsConsumerImpl;
-class MultiTopicsConsumerImpl : public ConsumerImplBase,
-                                public std::enable_shared_from_this<MultiTopicsConsumerImpl> {
+class MultiTopicsConsumerImpl : public ConsumerImplBase {
    public:
-    enum MultiTopicsConsumerState
-    {
-        Pending,
-        Ready,
-        Closing,
-        Closed,
-        Failed
-    };
     MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                             const std::string& subscriptionName, TopicNamePtr topicName,
                             const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
@@ -99,16 +90,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     const ClientImplPtr client_;
     const std::string subscriptionName_;
     std::string consumerStr_;
-    std::string topic_;
     const ConsumerConfiguration conf_;
     typedef SynchronizedHashMap<std::string, ConsumerImplPtr> ConsumerMap;
     ConsumerMap consumers_;
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    std::atomic<MultiTopicsConsumerState> state_{Pending};
-    BlockingQueue<Message> messages_;
-    const ExecutorServicePtr listenerExecutor_;
+    BlockingQueue<Message> incomingMessages_;
+    std::atomic_int incomingMessagesSize_ = {0};
     MessageListener messageListener_;
     DeadlineTimerPtr partitionsUpdateTimer_;
     boost::posix_time::time_duration partitionsUpdateInterval_;
@@ -125,6 +114,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
                                               unsigned int partitionIndex);
     void notifyResult(CloseCallback closeCallback);
     void messageReceived(Consumer consumer, const Message& msg);
+    void messageProcessed(Message& msg);
     void internalListener(Consumer consumer);
     void receiveMessages();
     void failPendingReceiveCallback();
@@ -149,8 +139,12 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex,
                                     ConsumerSubResultPromisePtr topicSubResultPromise,
                                     std::shared_ptr<std::atomic<int>> partitionsNeedCreate);
+    // impl consumer base virtual method
+    bool hasEnoughMessagesForBatchReceive() const override;
+    void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
 
    private:
+    std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
 
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 54a07bb..d3e424e 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -4098,3 +4098,184 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+    }
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeout) { testBatchReceiveTimeout(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { testBatchReceiveTimeout(true); }
+
+void testBatchReceiveClose(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-close" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-close" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultAlreadyClosed);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    consumer.close();
+    client.close();
+
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }
diff --git a/tests/BatchReceivePolicyTest.cc b/tests/BatchReceivePolicyTest.cc
new file mode 100644
index 0000000..ab9ffcc
--- /dev/null
+++ b/tests/BatchReceivePolicyTest.cc
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/BatchReceivePolicy.h>
+
+using namespace pulsar;
+
+TEST(BatchReceivePolicyTest, testBatchReceivePolicy) {
+    ASSERT_THROW(BatchReceivePolicy(-1, -1, -1), std::invalid_argument);
+
+    {
+        BatchReceivePolicy batchReceivePolicy;
+        ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1);
+        ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024);
+        ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 100);
+    }
+
+    {
+        BatchReceivePolicy batchReceivePolicy(-1, -1, 123);
+        ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1);
+        ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024);
+        ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 123);
+    }
+}
diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc
index 24f541b..20cd8f4 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -61,6 +61,9 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
     ASSERT_EQ(conf.getPriorityLevel(), 0);
     ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 10);
     ASSERT_EQ(conf.isAutoAckOldestChunkedMessageOnQueueFull(), false);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), -1);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10 * 1024 * 1024);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
 }
 
 TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -151,6 +154,11 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
 
     conf.setAutoAckOldestChunkedMessageOnQueueFull(true);
     ASSERT_TRUE(conf.isAutoAckOldestChunkedMessageOnQueueFull());
+
+    conf.setBatchReceivePolicy(BatchReceivePolicy(10, 10, 100));
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), 10);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
 }
 
 TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
diff --git a/tests/MessagesImplTest.cc b/tests/MessagesImplTest.cc
new file mode 100644
index 0000000..e963501
--- /dev/null
+++ b/tests/MessagesImplTest.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <MessagesImpl.h>
+#include "pulsar/MessageBuilder.h"
+
+using namespace pulsar;
+
+TEST(MessagesImplTest, testMessage) {
+    // 0. test not limits
+    {
+        MessagesImpl messages(-1, -1);
+        ASSERT_TRUE(messages.canAdd(Message()));
+    }
+
+    // 1. test max number of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(10, -1);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        ASSERT_THROW(messages.add(msg), std::invalid_argument);
+        messages.clear();
+        ASSERT_TRUE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 0);
+    }
+
+    // 2. test max size of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(-1, 10);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        ASSERT_THROW(messages.add(msg), std::invalid_argument);
+        messages.clear();
+        ASSERT_TRUE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 0);
+    }
+}


[pulsar-client-cpp] 04/06: [fix] Fix memory leak caused by incorrect close and destruction (#54)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 361a6214894c58c0ab463c41ffb92a2c7458c897
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 20 22:45:20 2022 +0800

    [fix] Fix memory leak caused by incorrect close and destruction (#54)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/55
    
    ### Motivation
    
    1. When a producer or consumer is closed, the reference is still stored
       in `ClientImpl`. If a client kept creating producers or consumers,
       the memory usage would not reduce.
    2. When the `HandlerBase::connection_` field is modified, the
       `removeProducer` or `removeConsumer` method is not called. Then these
       producers and consumers will be cached in the connection until the
       connection is closed.
    3. The `PartitionedProducerImpl` and `MultiTopicsConsumerImpl` have
       cyclic references, when a `Producer` or `Consumer` instance goes out
       of the scope, the destructors are not called. When I used GDB to
       debug them, I found the reference counts were both greater than 1.
    
    ### Modifications
    
    Let's use "handlers" to represent "producers and consumers".
    
    1. In `ClientImpl`, use `SynchronizedHashMap` to store references of
       handlers, as well as the `cleanupXXX` methods to remove a handler.
    2. Add `HandlerBase::beforeConnectionChange` method, which is called
       before `connection_` is modified. Disallow the access to
      `connection_` from derived classes.
    3. Avoid `shared_from_this()` being passed into callbacks in ASIO
       executors for `PartitionedProducerImpl` and
       `MultiTopicsConsumerImpl`.
    
    This PR also unifies the `shutdown` implementations for handlers and
    call `shutdown` in the destructors.
    1. Cancel the timers
    2. Unregister itself from `ClientImpl` and `ClientConnection`
    3. Set the create future with `ResultAlreadyClosed`
    4. Set the state to `Closed`
    
    It's called when:
    - the destructor is called
    - `closeAsync` is completed
    - `unsubscribeAsync` is completed with ResultOk
    
    ### Verifications
    
    `ShutdownTest` is added to verify the following cases:
    - a single topic
    - a partitioned topic (multiple topics)
    - a partitioned topic with regex subscription
    
    `testClose` verifies `shutdown` when `closeAsync` and `unsubscribeAsync`
    are called. `testDestructor` verifies `shutdown` when handlers go out of
    the scope and the destructors are called.
---
 .github/workflows/ci-pr-validation.yaml |   2 +-
 lib/ClientConnection.h                  |   2 +-
 lib/ClientImpl.cc                       |  89 +++++++------
 lib/ClientImpl.h                        |  13 +-
 lib/ConnectionPool.h                    |   4 +-
 lib/ConsumerImpl.cc                     | 125 +++++++++---------
 lib/ConsumerImpl.h                      |   5 +-
 lib/HandlerBase.cc                      |  24 ++--
 lib/HandlerBase.h                       |  20 ++-
 lib/MultiTopicsConsumerImpl.cc          | 217 ++++++++++++++++++++++----------
 lib/MultiTopicsConsumerImpl.h           |  11 +-
 lib/PartitionedProducerImpl.cc          |  83 ++++++++----
 lib/PartitionedProducerImpl.h           |   5 +-
 lib/PatternMultiTopicsConsumerImpl.cc   |  19 +--
 lib/PatternMultiTopicsConsumerImpl.h    |   1 +
 lib/PeriodicTask.cc                     |   5 +-
 lib/PeriodicTask.h                      |   2 +-
 lib/ProducerImpl.cc                     |  90 ++++++-------
 lib/ProducerImpl.h                      |   5 +-
 lib/SynchronizedHashMap.h               |  18 +--
 tests/ClientTest.cc                     |  28 ++---
 tests/PulsarFriend.h                    |  35 +++++-
 tests/ShutdownTest.cc                   | 121 ++++++++++++++++++
 tests/WaitUtils.h                       |  43 -------
 24 files changed, 610 insertions(+), 357 deletions(-)

diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml
index 58f1485..1bd128c 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -68,7 +68,7 @@ jobs:
         run: ./pulsar-test-service-start.sh
 
       - name: Run unit tests
-        run: ./run-unit-tests.sh
+        run: RETRY_FAILED=3 ./run-unit-tests.sh
 
       - name: Stop Pulsar service
         run: ./pulsar-test-service-stop.sh
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 418a583..8a48408 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -314,7 +314,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
     PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
 
-    std::mutex mutex_;
+    mutable std::mutex mutex_;
     typedef std::unique_lock<std::mutex> Lock;
 
     // Pending buffers to write on the socket
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 29e92f3..025727a 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -189,9 +189,15 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
 void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                        CreateProducerCallback callback, ProducerImplBasePtr producer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
-        producers_.push_back(producer);
-        lock.unlock();
+        auto pair = producers_.emplace(producer.get(), producer);
+        if (!pair.second) {
+            auto existingProducer = pair.first->second.lock();
+            LOG_ERROR("Unexpected existing producer at the same address: "
+                      << pair.first->first << ", producer: "
+                      << (existingProducer ? existingProducer->getProducerName() : "(null)"));
+            callback(ResultUnknownError, {});
+            return;
+        }
         callback(result, Producer(producer));
     } else {
         callback(result, {});
@@ -241,9 +247,18 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
     ConsumerImplBasePtr consumer = reader->getConsumer().lock();
     auto self = shared_from_this();
     reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
-        Lock lock(mutex_);
-        consumers_.push_back(weakConsumerPtr);
-        lock.unlock();
+        auto consumer = weakConsumerPtr.lock();
+        if (consumer) {
+            auto pair = consumers_.emplace(consumer.get(), consumer);
+            if (!pair.second) {
+                auto existingConsumer = pair.first->second.lock();
+                LOG_ERROR("Unexpected existing consumer at the same address: "
+                          << pair.first->first
+                          << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+            }
+        } else {
+            LOG_ERROR("Unexpected case: the consumer is somehow expired");
+        }
     });
 }
 
@@ -397,9 +412,15 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
 void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                        SubscribeCallback callback, ConsumerImplBasePtr consumer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
-        consumers_.push_back(consumer);
-        lock.unlock();
+        auto pair = consumers_.emplace(consumer.get(), consumer);
+        if (!pair.second) {
+            auto existingConsumer = pair.first->second.lock();
+            LOG_ERROR("Unexpected existing consumer at the same address: "
+                      << pair.first->first
+                      << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+            callback(ResultUnknownError, {});
+            return;
+        }
         callback(result, Consumer(consumer));
     } else {
         callback(result, {});
@@ -477,27 +498,26 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
 }
 
 void ClientImpl::closeAsync(CloseCallback callback) {
-    Lock lock(mutex_);
-    ProducersList producers(producers_);
-    ConsumersList consumers(consumers_);
-
-    if (state_ != Open && callback) {
-        lock.unlock();
-        callback(ResultAlreadyClosed);
+    if (state_ != Open) {
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
         return;
     }
     // Set the state to Closing so that no producers could get added
     state_ = Closing;
-    lock.unlock();
 
     memoryLimitController_.close();
 
+    auto producers = producers_.move();
+    auto consumers = consumers_.move();
+
     SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
     LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
                                            << " consumers");
 
-    for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
-        ProducerImplBasePtr producer = it->lock();
+    for (auto&& kv : producers) {
+        ProducerImplBasePtr producer = kv.second.lock();
         if (producer && !producer->isClosed()) {
             producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
                                            std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -507,8 +527,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
         }
     }
 
-    for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
-        ConsumerImplBasePtr consumer = it->lock();
+    for (auto&& kv : consumers) {
+        ConsumerImplBasePtr consumer = kv.second.lock();
         if (consumer && !consumer->isClosed()) {
             consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
                                            std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -562,23 +582,18 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
 }
 
 void ClientImpl::shutdown() {
-    Lock lock(mutex_);
-    ProducersList producers;
-    ConsumersList consumers;
+    auto producers = producers_.move();
+    auto consumers = consumers_.move();
 
-    producers.swap(producers_);
-    consumers.swap(consumers_);
-    lock.unlock();
-
-    for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
-        ProducerImplBasePtr producer = it->lock();
+    for (auto&& kv : producers) {
+        ProducerImplBasePtr producer = kv.second.lock();
         if (producer) {
             producer->shutdown();
         }
     }
 
-    for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
-        ConsumerImplBasePtr consumer = it->lock();
+    for (auto&& kv : consumers) {
+        ConsumerImplBasePtr consumer = kv.second.lock();
         if (consumer) {
             consumer->shutdown();
         }
@@ -631,26 +646,24 @@ uint64_t ClientImpl::newRequestId() {
 }
 
 uint64_t ClientImpl::getNumberOfProducers() {
-    Lock lock(mutex_);
     uint64_t numberOfAliveProducers = 0;
-    for (const auto& producer : producers_) {
+    producers_.forEachValue([&numberOfAliveProducers](const ProducerImplBaseWeakPtr& producer) {
         const auto& producerImpl = producer.lock();
         if (producerImpl) {
             numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
         }
-    }
+    });
     return numberOfAliveProducers;
 }
 
 uint64_t ClientImpl::getNumberOfConsumers() {
-    Lock lock(mutex_);
     uint64_t numberOfAliveConsumers = 0;
-    for (const auto& consumer : consumers_) {
+    consumers_.forEachValue([&numberOfAliveConsumers](const ConsumerImplBaseWeakPtr& consumer) {
         const auto consumerImpl = consumer.lock();
         if (consumerImpl) {
             numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
         }
-    }
+    });
     return numberOfAliveConsumers;
 }
 
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index e8e7708..50ddeff 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -31,6 +31,7 @@
 #include <atomic>
 #include <vector>
 #include "ServiceNameResolver.h"
+#include "SynchronizedHashMap.h"
 
 namespace pulsar {
 
@@ -91,6 +92,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     ExecutorServiceProviderPtr getListenerExecutorProvider();
     ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
     LookupServicePtr getLookup();
+
+    void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
+
+    void cleanupConsumer(ConsumerImplBase* address) { consumers_.remove(address); }
+
     friend class PulsarFriend;
 
    private:
@@ -147,11 +153,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
     uint64_t consumerIdGenerator_;
     uint64_t requestIdGenerator_;
 
-    typedef std::vector<ProducerImplBaseWeakPtr> ProducersList;
-    ProducersList producers_;
-
-    typedef std::vector<ConsumerImplBaseWeakPtr> ConsumersList;
-    ConsumersList consumers_;
+    SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
+    SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
 
     std::atomic<Result> closingError;
 
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 21d439e..996df54 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -74,10 +74,10 @@ class PULSAR_PUBLIC ConnectionPool {
     typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
     PoolMap pool_;
     bool poolConnections_;
-    std::mutex mutex_;
+    mutable std::mutex mutex_;
     std::atomic_bool closed_{false};
 
-    friend class ConnectionPoolTest;
+    friend class PulsarFriend;
 };
 }  // namespace pulsar
 #endif  //_PULSAR_CONNECTION_POOL_HEADER_
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 54e346f..7be5a6a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -104,7 +104,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
 
 ConsumerImpl::~ConsumerImpl() {
     LOG_DEBUG(getName() << "~ConsumerImpl");
-    incomingMessages_.clear();
     if (state_ == Ready) {
         // this could happen at least in this condition:
         //      consumer seek, caused reconnection, if consumer close happened before connection ready,
@@ -121,6 +120,7 @@ ConsumerImpl::~ConsumerImpl() {
             LOG_INFO(getName() << "Closed consumer for race condition: " << consumerId_);
         }
     }
+    shutdown();
 }
 
 void ConsumerImpl::setPartitionIndex(int partitionIndex) { partitionIndex_ = partitionIndex; }
@@ -156,6 +156,8 @@ void ConsumerImpl::start() {
     ackGroupingTrackerPtr_->start();
 }
 
+void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { cnx.removeConsumer(consumerId_); }
+
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
@@ -220,7 +222,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
         LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
         {
             Lock lock(mutex_);
-            connection_ = cnx;
+            setCnx(cnx);
             incomingMessages_.clear();
             state_ = Ready;
             backoff_.reset();
@@ -267,13 +269,24 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
     }
 }
 
-void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     LOG_INFO(getName() << "Unsubscribing");
 
+    auto callback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            shutdown();
+            LOG_INFO(getName() << "Unsubscribed successfully");
+        } else {
+            state_ = Ready;
+            LOG_WARN(getName() << "Failed to unsubscribe: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+
     if (state_ != Ready) {
         callback(ResultAlreadyClosed);
-        LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
-                               "then call unsubscribe");
         return;
     }
 
@@ -286,9 +299,9 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
         lock.unlock();
         int requestId = client->newRequestId();
         SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
+        auto self = get_shared_this_ptr();
         cnx->sendRequestWithId(cmd, requestId)
-            .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, get_shared_this_ptr(),
-                                   std::placeholders::_1, callback));
+            .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
     } else {
         Result result = ResultNotConnected;
         lock.unlock();
@@ -297,16 +310,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     }
 }
 
-void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
-    if (result == ResultOk) {
-        state_ = Closed;
-        LOG_INFO(getName() << "Unsubscribed successfully");
-    } else {
-        LOG_WARN(getName() << "Failed to unsubscribe: " << strResult(result));
-    }
-    callback(result);
-}
-
 Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
                                                          const proto::MessageMetadata& metadata,
                                                          const MessageId& messageId,
@@ -990,20 +993,25 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
 
 void ConsumerImpl::disconnectConsumer() {
     LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
-    Lock lock(mutex_);
-    connection_.reset();
-    lock.unlock();
+    resetCnx();
     scheduleReconnection(get_shared_this_ptr());
 }
 
-void ConsumerImpl::closeAsync(ResultCallback callback) {
-    // Keep a reference to ensure object is kept alive
-    ConsumerImplPtr ptr = get_shared_this_ptr();
+void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
+    auto callback = [this, originalCallback](Result result) {
+        shutdown();
+        if (result == ResultOk) {
+            LOG_INFO(getName() << "Closed consumer " << consumerId_);
+        } else {
+            LOG_WARN(getName() << "Failed to close consumer: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
 
     if (state_ != Ready) {
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
         return;
     }
 
@@ -1018,66 +1026,40 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
-        state_ = Closed;
         // If connection is gone, also the consumer is closed on the broker side
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
     ClientImplPtr client = client_.lock();
     if (!client) {
-        state_ = Closed;
         // Client was already destroyed
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
-    int requestId = client->newRequestId();
-    Future<Result, ResponseData> future =
-        cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
-    if (callback) {
-        // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
-        future.addListener(std::bind(&ConsumerImpl::handleClose, get_shared_this_ptr(), std::placeholders::_1,
-                                     callback, ptr));
-    }
+    cancelTimers();
 
-    // fail pendingReceive callback
-    failPendingReceiveCallback();
-    failPendingBatchReceiveCallback();
-
-    // cancel timer
-    batchReceiveTimer_->cancel();
-}
-
-void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
-    if (result == ResultOk) {
-        state_ = Closed;
-
-        ClientConnectionPtr cnx = getCnx().lock();
-        if (cnx) {
-            cnx->removeConsumer(consumerId_);
-        }
-
-        LOG_INFO(getName() << "Closed consumer " << consumerId_);
-    } else {
-        LOG_ERROR(getName() << "Failed to close consumer: " << result);
-    }
-
-    if (callback) {
-        callback(result);
-    }
+    int requestId = client->newRequestId();
+    auto self = get_shared_this_ptr();
+    cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
+        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
 }
 
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
 
 void ConsumerImpl::shutdown() {
-    state_ = Closed;
-
+    incomingMessages_.clear();
+    resetCnx();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupConsumer(this);
+    }
+    cancelTimers();
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    failPendingReceiveCallback();
+    failPendingBatchReceiveCallback();
+    state_ = Closed;
 }
 
 bool ConsumerImpl::isClosed() { return state_ == Closed; }
@@ -1437,4 +1419,9 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
     return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this());
 }
 
+void ConsumerImpl::cancelTimers() noexcept {
+    boost::system::error_code ec;
+    batchReceiveTimer_->cancel(ec);
+}
+
 } /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 09d2c5c..3aa632a 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -84,7 +84,6 @@ class ConsumerImpl : public ConsumerImplBase {
     void activeConsumerChanged(bool isActive);
     inline proto::CommandSubscribe_SubType getSubType();
     inline proto::CommandSubscribe_InitialPosition getInitialPosition();
-    void handleUnsubscribe(Result result, ResultCallback callback);
 
     /**
      * Send individual ACK request of given message ID to broker.
@@ -140,6 +139,7 @@ class ConsumerImpl : public ConsumerImplBase {
     virtual bool isReadCompacted();
     virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
     virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
+    void beforeConnectionChange(ClientConnection& cnx) override;
 
    protected:
     // overrided methods from HandlerBase
@@ -156,7 +156,8 @@ class ConsumerImpl : public ConsumerImplBase {
 
     void internalConsumerChangeListener(bool isActive);
 
-    void handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer);
+    void cancelTimers() noexcept;
+
     ConsumerStatsBasePtr consumerStatsBasePtr_;
 
    private:
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 506207e..1f4ce6e 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -30,7 +30,6 @@ namespace pulsar {
 HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
     : client_(client),
       topic_(topic),
-      connection_(),
       executor_(client->getIOExecutorProvider()->get()),
       mutex_(),
       creationTimestamp_(TimeUtils::now()),
@@ -50,14 +49,25 @@ void HandlerBase::start() {
     }
 }
 
+ClientConnectionWeakPtr HandlerBase::getCnx() const {
+    Lock lock(connectionMutex_);
+    return connection_;
+}
+
+void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
+    Lock lock(connectionMutex_);
+    auto previousCnx = connection_.lock();
+    if (previousCnx) {
+        beforeConnectionChange(*previousCnx);
+    }
+    connection_ = cnx;
+}
+
 void HandlerBase::grabCnx() {
-    Lock lock(mutex_);
-    if (connection_.lock()) {
-        lock.unlock();
+    if (getCnx().lock()) {
         LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
         return;
     }
-    lock.unlock();
     LOG_INFO(getName() << "Getting connection from pool");
     ClientImplPtr client = client_.lock();
     Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_);
@@ -96,14 +106,14 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
 
     State state = handler->state_;
 
-    ClientConnectionPtr currentConnection = handler->connection_.lock();
+    ClientConnectionPtr currentConnection = handler->getCnx().lock();
     if (currentConnection && connection.lock().get() != currentConnection.get()) {
         LOG_WARN(handler->getName()
                  << "Ignoring connection closed since we are already attached to a newer connection");
         return;
     }
 
-    handler->connection_.reset();
+    handler->resetCnx();
 
     if (result == ResultRetryable) {
         scheduleReconnection(handler);
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 6fc3603..6616ec4 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -44,11 +44,9 @@ class HandlerBase {
 
     void start();
 
-    /*
-     * get method for derived class to access weak ptr to connection so that they
-     * have to check if they can get a shared_ptr out of it or not
-     */
-    ClientConnectionWeakPtr getCnx() const { return connection_; }
+    ClientConnectionWeakPtr getCnx() const;
+    void setCnx(const ClientConnectionPtr& cnx);
+    void resetCnx() { setCnx(nullptr); }
 
    protected:
     /*
@@ -65,6 +63,14 @@ class HandlerBase {
      * Should we retry in error that are transient
      */
     bool isRetriableError(Result result);
+
+    /**
+     * Do some cleanup work before changing `connection_` to `cnx`.
+     *
+     * @param cnx the current connection
+     */
+    virtual void beforeConnectionChange(ClientConnection& cnx) = 0;
+
     /*
      * connectionOpened will be implemented by derived class to receive notification
      */
@@ -86,7 +92,6 @@ class HandlerBase {
    protected:
     ClientImplWeakPtr client_;
     const std::string topic_;
-    ClientConnectionWeakPtr connection_;
     ExecutorServicePtr executor_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
@@ -112,6 +117,9 @@ class HandlerBase {
 
    private:
     DeadlineTimerPtr timer_;
+
+    mutable std::mutex connectionMutex_;
+    ClientConnectionWeakPtr connection_;
     friend class ClientConnection;
     friend class PulsarFriend;
 };
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 573c33d..c54f8e8 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -19,6 +19,7 @@
 #include "MultiTopicsConsumerImpl.h"
 #include "MultiResultCallback.h"
 #include "MessagesImpl.h"
+#include <stdexcept>
 
 DECLARE_LOG_OBJECT()
 
@@ -55,11 +56,11 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
-    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval());
     if (partitionsUpdateInterval > 0) {
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
         partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
-        lookupServicePtr_ = client_->getLookup();
+        lookupServicePtr_ = client->getLookup();
     }
 
     state_ = Pending;
@@ -83,10 +84,16 @@ void MultiTopicsConsumerImpl::start() {
     int topicsNumber = topics_.size();
     std::shared_ptr<std::atomic<int>> topicsNeedCreate = std::make_shared<std::atomic<int>>(topicsNumber);
     // subscribe for each passed in topic
+    auto weakSelf = weak_from_this();
     for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
-        subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed,
-                                                           get_shared_this_ptr(), std::placeholders::_1,
-                                                           std::placeholders::_2, *itr, topicsNeedCreate));
+        auto topic = *itr;
+        subscribeOneTopicAsync(topic).addListener(
+            [this, weakSelf, topic, topicsNeedCreate](Result result, const Consumer& consumer) {
+                auto self = weakSelf.lock();
+                if (self) {
+                    handleOneTopicSubscribed(result, consumer, topic, topicsNeedCreate);
+                }
+            });
     }
 }
 
@@ -111,9 +118,9 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
-            // It's safe to capture only this here, because the callback can be called only when this is valid
-            closeAsync(
-                [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
+            // `shutdown()`, which set multiTopicsConsumerCreatedPromise_ with `failedResult`, will be called
+            // when `closeAsync` completes.
+            closeAsync(nullptr);
         }
     }
 }
@@ -164,10 +171,20 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
                                                        ConsumerSubResultPromisePtr topicSubResultPromise) {
     std::shared_ptr<ConsumerImpl> consumer;
     ConsumerConfiguration config = conf_.clone();
-    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
+    auto client = client_.lock();
+    if (!client) {
+        topicSubResultPromise->setFailed(ResultAlreadyClosed);
+        return;
+    }
+    ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
 
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
-                                        std::placeholders::_1, std::placeholders::_2));
+    auto weakSelf = weak_from_this();
+    config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+        auto self = weakSelf.lock();
+        if (self) {
+            messageReceived(consumer, msg);
+        }
+    });
 
     int partitions = numPartitions == 0 ? 1 : numPartitions;
 
@@ -186,7 +203,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
     // non-partitioned topic
     if (numPartitions == 0) {
         // We don't have to add partition-n suffix
-        consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
+        consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
                                                   topicName->isPersistent(), internalListenerExecutor, true,
                                                   NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(std::bind(
@@ -199,7 +216,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
     } else {
         for (int i = 0; i < numPartitions; i++) {
             std::string topicPartitionName = topicName->getTopicPartitionName(i);
-            consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+            consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
                                                       topicName->isPersistent(), internalListenerExecutor,
                                                       true, Partitioned);
             consumer->getConsumerCreatedFuture().addListener(std::bind(
@@ -244,12 +261,24 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
     }
 }
 
-void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
     LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
 
+    auto callback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            shutdown();
+            LOG_INFO(getName() << "Unsubscribed successfully");
+        } else {
+            state_ = Ready;
+            LOG_WARN(getName() << "Failed to unsubscribe: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+
     const auto state = state_.load();
     if (state == Closing || state == Closed) {
-        LOG_INFO(consumerStr_ << " already closed");
         callback(ResultAlreadyClosed);
         return;
     }
@@ -284,12 +313,9 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
 
     if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
         LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer.  - " << consumerStr_);
-        consumers_.clear();
-        topicsPartitions_.clear();
-        unAckedMessageTrackerPtr_->clear();
-
         Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
-        state_ = Closed;
+        // The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if
+        // unsubscribe succeeds.
         callback(result1);
         return;
     }
@@ -376,20 +402,27 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     }
 }
 
-void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
+    auto callback = [this, originalCallback](Result result) {
+        shutdown();
+        if (result != ResultOk) {
+            LOG_WARN(getName() << "Failed to close consumer: " << result);
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
     const auto state = state_.load();
     if (state == Closing || state == Closed) {
-        LOG_ERROR("TopicsConsumer already closed "
-                  << " topic" << topic_ << " consumer - " << consumerStr_);
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
         return;
     }
 
     state_ = Closing;
 
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+    cancelTimers();
+
+    auto weakSelf = weak_from_this();
     int numConsumers = 0;
     consumers_.clear(
         [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
@@ -418,27 +451,14 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
                 }
                 // closed all consumers
                 if (numConsumersLeft == 0) {
-                    incomingMessages_.clear();
-                    topicsPartitions_.clear();
-                    unAckedMessageTrackerPtr_->clear();
-
-                    if (state_ != Failed) {
-                        state_ = Closed;
-                    }
-
-                    if (callback) {
-                        callback(result);
-                    }
+                    callback(result);
                 }
             });
         });
     if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << subscriptionName_);
-        state_ = Closed;
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
         return;
     }
 
@@ -461,8 +481,13 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         lock.unlock();
-        listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        auto weakSelf = weak_from_this();
+        listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
+            auto self = weakSelf.lock();
+            if (self) {
+                notifyPendingReceivedCallback(ResultOk, msg, callback);
+            }
+        });
         return;
     }
 
@@ -564,13 +589,18 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
-        listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
+        auto weakSelf = weak_from_this();
+        listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
+            auto self = weakSelf.lock();
+            if (self) {
+                notifyPendingReceivedCallback(ResultAlreadyClosed, msg, callback);
+            }
+        });
     }
     lock.unlock();
 }
 
-void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const Message& msg,
                                                             const ReceiveCallback& callback) {
     if (result == ResultOk) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -609,7 +639,7 @@ void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
     }
 }
 
-MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
+MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { shutdown(); }
 
 Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
     return multiTopicsConsumerCreatedPromise_.getFuture();
@@ -620,7 +650,24 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
 
-void MultiTopicsConsumerImpl::shutdown() {}
+void MultiTopicsConsumerImpl::shutdown() {
+    cancelTimers();
+    incomingMessages_.clear();
+    topicsPartitions_.clear();
+    unAckedMessageTrackerPtr_->clear();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupConsumer(this);
+    }
+    consumers_.clear();
+    topicsPartitions_.clear();
+    if (failedResult != ResultOk) {
+        multiTopicsConsumerCreatedPromise_.setFailed(failedResult);
+    } else {
+        multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    }
+    state_ = Closed;
+}
 
 bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
 
@@ -684,13 +731,16 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
     lock.unlock();
 
-    auto self = get_shared_this_ptr();
     size_t i = 0;
-    consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
+    consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
         size_t index = i++;
+        auto weakSelf = weak_from_this();
         consumer->getBrokerConsumerStatsAsync(
-            [self, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
-                self->handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+            [this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
+                auto self = weakSelf.lock();
+                if (self) {
+                    handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+                }
             });
     });
 }
@@ -772,7 +822,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
 }
 void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+    auto weakSelf = weak_from_this();
     partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
         // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
         // cannot continue at this time, and the request needs to be ignored.
@@ -790,9 +840,15 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
     for (const auto& item : topicsPartitions) {
         auto topicName = TopicName::get(item.first);
         auto currentNumPartitions = item.second;
+        auto weakSelf = weak_from_this();
         lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
-            std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, get_shared_this_ptr(), topicName,
-                      std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
+            [this, weakSelf, topicName, currentNumPartitions](Result result,
+                                                              const LookupDataResultPtr& lookupDataResult) {
+                auto self = weakSelf.lock();
+                if (self) {
+                    this->handleGetPartitions(topicName, result, lookupDataResult, currentNumPartitions);
+                }
+            });
     }
 }
 void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result,
@@ -831,9 +887,19 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
     ConsumerSubResultPromisePtr topicSubResultPromise,
     std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
     ConsumerConfiguration config = conf_.clone();
-    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
-                                        std::placeholders::_1, std::placeholders::_2));
+    auto client = client_.lock();
+    if (!client) {
+        topicSubResultPromise->setFailed(ResultAlreadyClosed);
+        return;
+    }
+    ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
+    auto weakSelf = weak_from_this();
+    config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+        auto self = weakSelf.lock();
+        if (self) {
+            messageReceived(consumer, msg);
+        }
+    });
 
     // Apply total limit of receiver queue size across partitions
     config.setReceiverQueueSize(
@@ -842,12 +908,18 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
 
     std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex);
 
-    auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+    auto consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
                                                    topicName->isPersistent(), internalListenerExecutor, true,
                                                    Partitioned);
     consumer->getConsumerCreatedFuture().addListener(
-        std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
-                  std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
+        [this, weakSelf, partitionsNeedCreate, topicSubResultPromise](
+            Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) {
+            auto self = weakSelf.lock();
+            if (self) {
+                handleSingleConsumerCreated(result, consumerImplBaseWeakPtr, partitionsNeedCreate,
+                                            topicSubResultPromise);
+            }
+        });
     consumer->setPartitionIndex(partitionIndex);
     consumer->start();
     consumers_.emplace(topicPartitionName, consumer);
@@ -873,9 +945,13 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
         messageProcessed(peekMsg);
         messages->add(peekMsg);
     }
-    auto self = get_shared_this_ptr();
-    listenerExecutor_->postWork(
-        [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+    auto weakSelf = weak_from_this();
+    listenerExecutor_->postWork([weakSelf, callback, messages]() {
+        auto self = weakSelf.lock();
+        if (self) {
+            callback(ResultOk, messages->getMessageList());
+        }
+    });
 }
 
 void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
@@ -886,3 +962,14 @@ void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
 std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
     return std::dynamic_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
 }
+
+void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
+    throw std::runtime_error("The connection_ field should not be modified for a MultiTopicsConsumerImpl");
+}
+
+void MultiTopicsConsumerImpl::cancelTimers() noexcept {
+    if (partitionsUpdateTimer_) {
+        boost::system::error_code ec;
+        partitionsUpdateTimer_->cancel(ec);
+    }
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 044f417..7c83da9 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -87,7 +87,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     Future<Result, Consumer> subscribeOneTopicAsync(const std::string& topic);
 
    protected:
-    const ClientImplPtr client_;
+    const ClientImplWeakPtr client_;
     const std::string subscriptionName_;
     std::string consumerStr_;
     const ConsumerConfiguration conf_;
@@ -118,7 +118,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     void internalListener(Consumer consumer);
     void receiveMessages();
     void failPendingReceiveCallback();
-    void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
+    void notifyPendingReceivedCallback(Result result, const Message& message,
+                                       const ReceiveCallback& callback);
 
     void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
                                   std::shared_ptr<std::atomic<int>> topicsNeedCreate);
@@ -142,10 +143,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     // impl consumer base virtual method
     bool hasEnoughMessagesForBatchReceive() const override;
     void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
+    void beforeConnectionChange(ClientConnection& cnx) override;
 
    private:
     std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
+    void cancelTimers() noexcept;
+
+    std::weak_ptr<MultiTopicsConsumerImpl> weak_from_this() noexcept {
+        return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
+    }
 
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 469ecc9..3d383ff 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -46,12 +46,12 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
                  (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
     conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
 
-    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval());
     if (partitionsUpdateInterval > 0) {
-        listenerExecutor_ = client_->getListenerExecutorProvider()->get();
+        listenerExecutor_ = client->getListenerExecutorProvider()->get();
         partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
         partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
-        lookupServicePtr_ = client_->getLookup();
+        lookupServicePtr_ = client->getLookup();
     }
 }
 
@@ -71,7 +71,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
     }
 }
 
-PartitionedProducerImpl::~PartitionedProducerImpl() {}
+PartitionedProducerImpl::~PartitionedProducerImpl() { shutdown(); }
 // override
 const std::string& PartitionedProducerImpl::getTopic() const { return topic_; }
 
@@ -86,7 +86,11 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
 
 ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
     using namespace std::placeholders;
-    auto producer = std::make_shared<ProducerImpl>(client_, *topicName_, conf_, partition);
+    auto client = client_.lock();
+    auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, partition);
+    if (!client) {
+        return producer;
+    }
 
     if (lazy) {
         createLazyPartitionProducer(partition);
@@ -211,7 +215,15 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
 }
 
 // override
-void PartitionedProducerImpl::shutdown() { state_ = Closed; }
+void PartitionedProducerImpl::shutdown() {
+    cancelTimers();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupProducer(this);
+    }
+    partitionedProducerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    state_ = Closed;
+}
 
 const std::string& PartitionedProducerImpl::getProducerName() const {
     Lock producersLock(producersMutex_);
@@ -239,11 +251,25 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
  * if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to
  * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
  */
-void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
-    if (state_ == Closing || state_ == Closed) {
+void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
+    auto closeCallback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            shutdown();
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+    if (state_ == Closed) {
+        closeCallback(ResultAlreadyClosed);
+        return;
+    }
+    State expectedState = Ready;
+    if (!state_.compare_exchange_strong(expectedState, Closing)) {
         return;
     }
-    state_ = Closing;
+
+    cancelTimers();
 
     unsigned int producerAlreadyClosed = 0;
 
@@ -271,12 +297,12 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
      * c. If closeAsync called due to failure in creating just one sub producer then state is set by
      * handleSinglePartitionProducerCreated
      */
-    if (producerAlreadyClosed == numProducers && closeCallback) {
-        state_ = Closed;
+    if (producerAlreadyClosed == numProducers) {
         closeCallback(ResultOk);
     }
 }
 
+// `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()`
 void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
                                                                  const unsigned int partitionIndex,
                                                                  CloseCallback callback) {
@@ -285,11 +311,9 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
         return;
     }
     if (result != ResultOk) {
-        state_ = Failed;
         LOG_ERROR("Closing the producer failed for partition - " << partitionIndex);
-        if (callback) {
-            callback(result);
-        }
+        callback(result);
+        state_ = Failed;
         return;
     }
     assert(partitionIndex < getNumPartitionsWithLock());
@@ -298,16 +322,13 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
     }
     // closed all successfully
     if (!numProducersCreated_) {
-        state_ = Closed;
         // set the producerCreatedPromise to failure, if client called
         // closeAsync and it's not failure to create producer, the promise
         // is set second time here, first time it was successful. So check
         // if there's any adverse effect of setting it again. It should not
         // be but must check. MUSTCHECK changeme
         partitionedProducerCreatedPromise_.setFailed(ResultUnknownError);
-        if (callback) {
-            callback(result);
-        }
+        callback(result);
         return;
     }
 }
@@ -371,15 +392,26 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
 }
 
 void PartitionedProducerImpl::runPartitionUpdateTask() {
+    auto weakSelf = weak_from_this();
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    partitionsUpdateTimer_->async_wait(
-        std::bind(&PartitionedProducerImpl::getPartitionMetadata, shared_from_this()));
+    partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+        auto self = weakSelf.lock();
+        if (self) {
+            self->getPartitionMetadata();
+        }
+    });
 }
 
 void PartitionedProducerImpl::getPartitionMetadata() {
     using namespace std::placeholders;
+    auto weakSelf = weak_from_this();
     lookupServicePtr_->getPartitionMetadataAsync(topicName_)
-        .addListener(std::bind(&PartitionedProducerImpl::handleGetPartitions, shared_from_this(), _1, _2));
+        .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) {
+            auto self = weakSelf.lock();
+            if (self) {
+                self->handleGetPartitions(result, lookupDataResult);
+            }
+        });
 }
 
 void PartitionedProducerImpl::handleGetPartitions(Result result,
@@ -446,4 +478,11 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
     return numberOfConnectedProducer;
 }
 
+void PartitionedProducerImpl::cancelTimers() noexcept {
+    if (partitionsUpdateTimer_) {
+        boost::system::error_code ec;
+        partitionsUpdateTimer_->cancel(ec);
+    }
+}
+
 }  // namespace pulsar
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 0a8c10e..cc7a4e0 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -73,10 +73,12 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     void notifyResult(CloseCallback closeCallback);
 
+    std::weak_ptr<PartitionedProducerImpl> weak_from_this() noexcept { return shared_from_this(); }
+
     friend class PulsarFriend;
 
    private:
-    const ClientImplPtr client_;
+    ClientImplWeakPtr client_;
 
     const TopicNamePtr topicName_;
     const std::string topic_;
@@ -119,6 +121,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     void runPartitionUpdateTask();
     void getPartitionMetadata();
     void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
+    void cancelTimers() noexcept;
 };
 
 }  // namespace pulsar
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc
index 79ed196..8014078 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -32,7 +32,7 @@ PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr cli
                               lookupServicePtr_),
       patternString_(pattern),
       pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)),
-      autoDiscoveryTimer_(),
+      autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()),
       autoDiscoveryRunning_(false) {
     namespaceName_ = TopicName::get(pattern)->getNamespaceName();
 }
@@ -215,9 +215,7 @@ void PatternMultiTopicsConsumerImpl::start() {
 
     LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
 
-    // Init autoDiscoveryTimer task only once, wait for the timeout to happen
-    if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) {
-        autoDiscoveryTimer_ = client_->getIOExecutorProvider()->get()->createDeadlineTimer();
+    if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
         autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
         autoDiscoveryTimer_->async_wait(
             std::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, std::placeholders::_1));
@@ -225,13 +223,16 @@ void PatternMultiTopicsConsumerImpl::start() {
 }
 
 void PatternMultiTopicsConsumerImpl::shutdown() {
-    Lock lock(mutex_);
-    state_ = Closed;
-    autoDiscoveryTimer_->cancel();
-    multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    cancelTimers();
+    MultiTopicsConsumerImpl::shutdown();
 }
 
 void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+    cancelTimers();
     MultiTopicsConsumerImpl::closeAsync(callback);
-    autoDiscoveryTimer_->cancel();
+}
+
+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
+    boost::system::error_code ec;
+    autoDiscoveryTimer_->cancel(ec);
 }
diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h
index 408d68e..448f2e3 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -72,6 +72,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
     bool autoDiscoveryRunning_;
     NamespaceNamePtr namespaceName_;
 
+    void cancelTimers() noexcept;
     void resetAutoDiscoveryTimer();
     void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics);
     void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback);
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 4e91ef5..65bdf23 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -38,12 +38,13 @@ void PeriodicTask::start() {
     }
 }
 
-void PeriodicTask::stop() {
+void PeriodicTask::stop() noexcept {
     State state = Ready;
     if (!state_.compare_exchange_strong(state, Closing)) {
         return;
     }
-    timer_.cancel();
+    ErrorCode ec;
+    timer_.cancel(ec);
     state_ = Pending;
 }
 
diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h
index 57d0734..159c86a 100644
--- a/lib/PeriodicTask.h
+++ b/lib/PeriodicTask.h
@@ -55,7 +55,7 @@ class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
 
     void start();
 
-    void stop();
+    void stop() noexcept;
 
     void setCallback(CallbackType callback) noexcept { callback_ = callback; }
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 20133c5..e228c83 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -109,7 +109,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
 
 ProducerImpl::~ProducerImpl() {
     LOG_DEBUG(getName() << "~ProducerImpl");
-    cancelTimers();
+    shutdown();
     printStats();
     if (state_ == Ready || state_ == Pending) {
         LOG_WARN(getName() << "Destroyed producer which was not properly closed");
@@ -124,6 +124,10 @@ int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished
 
 const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
 
+void ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
+    connection.removeProducer(producerId_);
+}
+
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
@@ -185,7 +189,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
             msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
         }
         resendMessages(cnx);
-        connection_ = cnx;
+        setCnx(cnx);
         state_ = Ready;
         backoff_.reset();
         lock.unlock();
@@ -645,7 +649,19 @@ void ProducerImpl::printStats() {
     }
 }
 
-void ProducerImpl::closeAsync(CloseCallback callback) {
+void ProducerImpl::closeAsync(CloseCallback originalCallback) {
+    auto callback = [this, originalCallback](Result result) {
+        if (result == ResultOk) {
+            LOG_INFO(getName() << "Closed producer " << producerId_);
+            shutdown();
+        } else {
+            LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
+        }
+        if (originalCallback) {
+            originalCallback(result);
+        }
+    };
+
     // if the producer was never started then there is nothing to clean up
     State expectedState = NotStarted;
     if (state_.compare_exchange_strong(expectedState, Closed)) {
@@ -653,9 +669,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
         return;
     }
 
-    // Keep a reference to ensure object is kept alive
-    ProducerImplPtr ptr = shared_from_this();
-
     cancelTimers();
 
     if (semaphore_) {
@@ -669,10 +682,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // just like Java's `getAndUpdate` method on an atomic variable
     const auto state = state_.load();
     if (state != Ready && state != Pending) {
-        state_ = Closed;
-        if (callback) {
-            callback(ResultAlreadyClosed);
-        }
+        callback(ResultAlreadyClosed);
 
         return;
     }
@@ -681,53 +691,24 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
-        state_ = Closed;
-
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
     // Detach the producer from the connection to avoid sending any other
     // message from the producer
-    connection_.reset();
+    resetCnx();
 
     ClientImplPtr client = client_.lock();
     if (!client) {
-        state_ = Closed;
-        // Client was already destroyed
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
     int requestId = client->newRequestId();
-    Future<Result, ResponseData> future =
-        cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
-    if (callback) {
-        // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
-        future.addListener(
-            std::bind(&ProducerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr));
-    }
-}
-
-void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
-    if (result == ResultOk) {
-        state_ = Closed;
-        LOG_INFO(getName() << "Closed producer " << producerId_);
-        ClientConnectionPtr cnx = getCnx().lock();
-        if (cnx) {
-            cnx->removeProducer(producerId_);
-        }
-    } else {
-        LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
-    }
-
-    if (callback) {
-        callback(result);
-    }
+    auto self = shared_from_this();
+    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
+        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
 }
 
 Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() {
@@ -868,9 +849,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer
 
 void ProducerImpl::disconnectProducer() {
     LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
-    Lock lock(mutex_);
-    connection_.reset();
-    lock.unlock();
+    resetCnx();
     scheduleReconnection(shared_from_this());
 }
 
@@ -885,16 +864,21 @@ void ProducerImpl::start() {
 }
 
 void ProducerImpl::shutdown() {
-    Lock lock(mutex_);
-    state_ = Closed;
+    resetCnx();
+    auto client = client_.lock();
+    if (client) {
+        client->cleanupProducer(this);
+    }
     cancelTimers();
     producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    state_ = Closed;
 }
 
-void ProducerImpl::cancelTimers() {
+void ProducerImpl::cancelTimers() noexcept {
     dataKeyRefreshTask_.stop();
-    batchTimer_.cancel();
-    sendTimer_.cancel();
+    boost::system::error_code ec;
+    batchTimer_.cancel(ec);
+    sendTimer_.cancel(ec);
 }
 
 bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 74eee61..0559515 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -109,6 +109,7 @@ class ProducerImpl : public HandlerBase,
     friend class BatchMessageContainer;
 
     // overrided methods from HandlerBase
+    void beforeConnectionChange(ClientConnection& connection) override;
     void connectionOpened(const ClientConnectionPtr& connection) override;
     void connectionFailed(Result result) override;
     HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
@@ -120,8 +121,6 @@ class ProducerImpl : public HandlerBase,
     void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
                               const ResponseData& responseData);
 
-    void handleClose(Result result, ResultCallback callback, ProducerImplPtr producer);
-
     void resendMessages(ClientConnectionPtr cnx);
 
     void refreshEncryptionKey(const boost::system::error_code& ec);
@@ -143,7 +142,7 @@ class ProducerImpl : public HandlerBase,
     void releaseSemaphore(uint32_t payloadSize);
     void releaseSemaphoreForSendOp(const OpSendMsg& op);
 
-    void cancelTimers();
+    void cancelTimers() noexcept;
 
     bool isValidProducerState(const SendCallback& callback) const;
     bool canAddToBatch(const Message& msg) const;
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index 831d1e8..9bed7d7 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -74,12 +74,9 @@ class SynchronizedHashMap {
 
     // clear the map and apply `f` on each removed value
     void clear(std::function<void(const K&, const V&)> f) {
-        Lock lock(mutex_);
-        auto it = data_.begin();
-        while (it != data_.end()) {
-            f(it->first, it->second);
-            auto next = data_.erase(it);
-            it = next;
+        MapType data = move();
+        for (auto&& kv : data) {
+            f(kv.first, kv.second);
         }
     }
 
@@ -131,8 +128,15 @@ class SynchronizedHashMap {
         return data_.size();
     }
 
+    MapType move() noexcept {
+        Lock lock(mutex_);
+        MapType data;
+        data_.swap(data);
+        return data;
+    }
+
    private:
-    std::unordered_map<K, V> data_;
+    MapType data_;
     // Use recursive_mutex to allow methods being called in `forEach`
     mutable MutexType mutex_;
 };
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index 216b548..aa48bdc 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -20,7 +20,6 @@
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
-#include "WaitUtils.h"
 
 #include <future>
 #include <pulsar/Client.h>
@@ -198,37 +197,34 @@ TEST(ClientTest, testReferenceCount) {
         Producer producer;
         ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
         ASSERT_EQ(producers.size(), 1);
-        ASSERT_TRUE(producers[0].use_count() > 0);
-        LOG_INFO("Reference count of the producer: " << producers[0].use_count());
+
+        producers.forEachValue([](const ProducerImplBaseWeakPtr &weakProducer) {
+            LOG_INFO("Reference count of producer: " << weakProducer.use_count());
+            ASSERT_FALSE(weakProducer.expired());
+        });
 
         Consumer consumer;
         ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
         ASSERT_EQ(consumers.size(), 1);
-        ASSERT_TRUE(consumers[0].use_count() > 0);
-        LOG_INFO("Reference count of the consumer: " << consumers[0].use_count());
 
         ReaderConfiguration readerConf;
         Reader reader;
         ASSERT_EQ(ResultOk,
                   client.createReader(topic + "-reader", MessageId::earliest(), readerConf, reader));
         ASSERT_EQ(consumers.size(), 2);
-        ASSERT_TRUE(consumers[1].use_count() > 0);
-        LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
+
+        consumers.forEachValue([](const ConsumerImplBaseWeakPtr &weakConsumer) {
+            LOG_INFO("Reference count of consumer: " << weakConsumer.use_count());
+            ASSERT_FALSE(weakConsumer.expired());
+        });
 
         readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
         ASSERT_TRUE(readerWeakPtr.use_count() > 0);
         LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
     }
 
-    ASSERT_EQ(producers.size(), 1);
-    ASSERT_EQ(producers[0].use_count(), 0);
-    ASSERT_EQ(consumers.size(), 2);
-
-    waitUntil(std::chrono::seconds(1), [&consumers, &readerWeakPtr] {
-        return consumers[0].use_count() == 0 && consumers[1].use_count() == 0 && readerWeakPtr.expired();
-    });
-    EXPECT_EQ(consumers[0].use_count(), 0);
-    EXPECT_EQ(consumers[1].use_count(), 0);
+    EXPECT_EQ(producers.size(), 0);
+    EXPECT_EQ(consumers.size(), 0);
     EXPECT_EQ(readerWeakPtr.use_count(), 0);
     client.close();
 }
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index d9f9923..df8e3dc 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -98,14 +98,45 @@ class PulsarFriend {
 
     static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
 
-    static ClientImpl::ProducersList& getProducers(const Client& client) {
+    static auto getProducers(const Client& client) -> decltype(ClientImpl::producers_)& {
         return getClientImplPtr(client)->producers_;
     }
 
-    static ClientImpl::ConsumersList& getConsumers(const Client& client) {
+    static auto getConsumers(const Client& client) -> decltype(ClientImpl::consumers_)& {
         return getClientImplPtr(client)->consumers_;
     }
 
+    static std::vector<ClientConnectionPtr> getConnections(const Client& client) {
+        auto& pool = client.impl_->pool_;
+        std::vector<ClientConnectionPtr> connections;
+        std::lock_guard<std::mutex> lock(pool.mutex_);
+        for (const auto& kv : pool.pool_) {
+            auto cnx = kv.second.lock();
+            if (cnx) {
+                connections.emplace_back(cnx);
+            }
+        }
+        return connections;
+    }
+
+    static std::vector<ProducerImplPtr> getProducers(const ClientConnection& cnx) {
+        std::vector<ProducerImplPtr> producers;
+        std::lock_guard<std::mutex> lock(cnx.mutex_);
+        for (const auto& kv : cnx.producers_) {
+            producers.emplace_back(kv.second.lock());
+        }
+        return producers;
+    }
+
+    static std::vector<ConsumerImplPtr> getConsumers(const ClientConnection& cnx) {
+        std::vector<ConsumerImplPtr> consumers;
+        std::lock_guard<std::mutex> lock(cnx.mutex_);
+        for (const auto& kv : cnx.consumers_) {
+            consumers.emplace_back(kv.second.lock());
+        }
+        return consumers;
+    }
+
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
         consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
     }
diff --git a/tests/ShutdownTest.cc b/tests/ShutdownTest.cc
new file mode 100644
index 0000000..e32a95c
--- /dev/null
+++ b/tests/ShutdownTest.cc
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <atomic>
+#include <ctime>
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include "lib/ClientImpl.h"
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+enum class EndToEndType : uint8_t
+{
+    SINGLE_TOPIC,
+    MULTI_TOPICS,
+    REGEX_TOPICS
+};
+
+class ShutdownTest : public ::testing::TestWithParam<EndToEndType> {
+   public:
+    void SetUp() override {
+        topic_ = topic_ + std::to_string(id_++) + "-" + std::to_string(time(nullptr));
+        if (GetParam() != EndToEndType::SINGLE_TOPIC) {
+            int res = makePutRequest(
+                "http://localhost:8080/admin/v2/persistent/public/default/" + topic_ + "/partitions", "2");
+            ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+        }
+    }
+
+   protected:
+    Client client_{lookupUrl};
+    decltype(PulsarFriend::getProducers(client_)) producers_{PulsarFriend::getProducers(client_)};
+    decltype(PulsarFriend::getConsumers(client_)) consumers_{PulsarFriend::getConsumers(client_)};
+    std::string topic_ = "shutdown-test-";
+
+    static std::atomic_int id_;
+
+    Result subscribe(Consumer &consumer) {
+        if (GetParam() == EndToEndType::REGEX_TOPICS) {
+            // NOTE: Currently the regex subscription requires the complete namespace prefix
+            return client_.subscribeWithRegex("persistent://public/default/" + topic_ + ".*", "sub",
+                                              consumer);
+        } else {
+            return client_.subscribe(topic_, "sub", consumer);
+        }
+    }
+
+    void assertConnectionsEmpty() {
+        auto connections = PulsarFriend::getConnections(client_);
+        for (const auto &cnx : PulsarFriend::getConnections(client_)) {
+            EXPECT_TRUE(PulsarFriend::getProducers(*cnx).empty());
+            EXPECT_TRUE(PulsarFriend::getConsumers(*cnx).empty());
+        }
+    }
+};
+
+std::atomic_int ShutdownTest::id_{0};
+
+TEST_P(ShutdownTest, testClose) {
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+    EXPECT_EQ(producers_.size(), 1);
+    ASSERT_EQ(ResultOk, producer.close());
+    EXPECT_EQ(producers_.size(), 0);
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, subscribe(consumer));
+    EXPECT_EQ(consumers_.size(), 1);
+    ASSERT_EQ(ResultOk, consumer.close());
+    EXPECT_EQ(consumers_.size(), 0);
+
+    ASSERT_EQ(ResultOk, subscribe(consumer));
+    EXPECT_EQ(consumers_.size(), 1);
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    EXPECT_EQ(consumers_.size(), 0);
+
+    assertConnectionsEmpty();
+    ASSERT_EQ(ResultOk, client_.close());
+}
+
+TEST_P(ShutdownTest, testDestructor) {
+    {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+        EXPECT_EQ(producers_.size(), 1);
+    }
+    EXPECT_EQ(producers_.size(), 0);
+
+    {
+        Consumer consumer;
+        ASSERT_EQ(ResultOk, subscribe(consumer));
+        EXPECT_EQ(consumers_.size(), 1);
+    }
+    EXPECT_EQ(consumers_.size(), 0);
+
+    assertConnectionsEmpty();
+    client_.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(Pulsar, ShutdownTest,
+                         ::testing::Values(EndToEndType::SINGLE_TOPIC, EndToEndType::MULTI_TOPICS,
+                                           EndToEndType::REGEX_TOPICS));
diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h
deleted file mode 100644
index abe3efc..0000000
--- a/tests/WaitUtils.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#pragma once
-
-#include <chrono>
-#include <functional>
-#include <thread>
-
-namespace pulsar {
-
-template <typename Rep, typename Period>
-inline void waitUntil(std::chrono::duration<Rep, Period> timeout, std::function<bool()> condition) {
-    auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
-    while (timeoutMs > 0) {
-        auto now = std::chrono::high_resolution_clock::now();
-        if (condition()) {
-            break;
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds(10));
-        auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
-                           std::chrono::high_resolution_clock::now() - now)
-                           .count();
-        timeoutMs -= elapsed;
-    }
-}
-
-}  // namespace pulsar


[pulsar-client-cpp] 06/06: Fixed the release artifacts package file name (#113)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit f70aa89d1ac0c012d0dc472e1c53462834dfb517
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 20 14:20:32 2022 -0700

    Fixed the release artifacts package file name (#113)
---
 .github/workflows/ci-build-binary-artifacts.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/ci-build-binary-artifacts.yaml b/.github/workflows/ci-build-binary-artifacts.yaml
index f2a931e..7c0a001 100644
--- a/.github/workflows/ci-build-binary-artifacts.yaml
+++ b/.github/workflows/ci-build-binary-artifacts.yaml
@@ -75,5 +75,5 @@ jobs:
       - name: Upload artifacts
         uses: actions/upload-artifact@v3
         with:
-          name: ${{matrix.pkg.type}}-${{matrix.pkg.platform}}
+          name: ${{matrix.pkg.type}}-${{matrix.cpu.platform}}
           path: ${{matrix.pkg.path}}


[pulsar-client-cpp] 01/06: Use parametrized job to build packages (#36)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit a6716404a120f606a9ba6e81db80ddfcad2272b0
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 12 19:30:35 2022 -0700

    Use parametrized job to build packages (#36)
    
    Consolidated all the RPM/DEB/APK jobs into a single matrix
---
 .github/workflows/ci-build-binary-artifacts.yaml | 213 +++--------------------
 .github/workflows/ci-pr-validation.yaml          |  89 +++-------
 2 files changed, 45 insertions(+), 257 deletions(-)

diff --git a/.github/workflows/ci-build-binary-artifacts.yaml b/.github/workflows/ci-build-binary-artifacts.yaml
index 46a9cf7..21e13db 100644
--- a/.github/workflows/ci-build-binary-artifacts.yaml
+++ b/.github/workflows/ci-build-binary-artifacts.yaml
@@ -29,188 +29,21 @@ concurrency:
 
 jobs:
 
-  deb-packaging-x86_64:
-    name: Build Debian Package - x86_64
-    runs-on: ubuntu-20.04
-    timeout-minutes: 300
-
-    steps:
-      - name: checkout
-        uses: actions/checkout@v2
-
-      - name: Package Pulsar source
-        run: build-support/generate-source-archive.sh
-
-      - uses: docker/setup-buildx-action@v2
-      - run: build-support/copy-deps-versionfile.sh
-
-      - name: Build dependencies Docker image
-        uses: docker/build-push-action@v3
-        with:
-          context: ./pkg/deb
-          load: true
-          tags: build-deb-x86_64:latest
-          build-args: PLATFORM=x86_64
-          cache-from: type=gha
-          cache-to: type=gha,mode=max
-
-      - name: Build Debian packages
-        run: pkg/deb/docker-build-deb-x86_64.sh build-deb-x86_64:latest
-
-      - name: Upload artifacts
-        uses: actions/upload-artifact@v3
-        with:
-          name: deb-x86_64
-          path: pkg/deb/BUILD/DEB
-
-  deb-packaging-arm64:
-    name: Build Debian Package - Arm64
-    runs-on: ubuntu-20.04
-    timeout-minutes: 300
-
-    steps:
-      - name: checkout
-        uses: actions/checkout@v2
-
-      - name: Set up QEMU
-        uses: docker/setup-qemu-action@v1
-
-      - name: Package Pulsar source
-        run: build-support/generate-source-archive.sh
-
-      - uses: docker/setup-buildx-action@v2
-      - run: build-support/copy-deps-versionfile.sh
-
-      - name: Build dependencies Docker image
-        uses: docker/build-push-action@v3
-        with:
-          context: ./pkg/deb
-          load: true
-          tags: build-deb-arm64:latest
-          build-args: PLATFORM=aarch64
-          platforms: linux/arm64
-          cache-from: type=gha
-          cache-to: type=gha,mode=max
-
-      - name: Build Debian packages
-        run: pkg/deb/docker-build-deb-arm64.sh build-deb-arm64:latest
-
-      - name: Upload artifacts
-        uses: actions/upload-artifact@v3
-        with:
-          name: deb-arm64
-          path: pkg/deb/BUILD/DEB
-
-  rpm-packaging-x86_64:
-    name: Build RPM Package - x86_64
-    runs-on: ubuntu-20.04
-    timeout-minutes: 300
-
-    steps:
-      - name: checkout
-        uses: actions/checkout@v2
-
-      - name: Package Pulsar source
-        run: build-support/generate-source-archive.sh
-
-      - uses: docker/setup-buildx-action@v2
-      - run: build-support/copy-deps-versionfile.sh
-
-      - name: Build dependencies Docker image
-        uses: docker/build-push-action@v3
-        with:
-          context: ./pkg/rpm
-          load: true
-          tags: build-rpm-x86_64:latest
-          build-args: PLATFORM=x86_64
-          cache-from: type=gha
-          cache-to: type=gha,mode=max
-
-      - name: Build RPM packages
-        run: pkg/rpm/docker-build-rpm-x86_64.sh build-rpm-x86_64:latest
-
-      - name: Upload artifacts
-        uses: actions/upload-artifact@v3
-        with:
-          name: rpm-x86_64
-          path: pkg/rpm/RPMS
-
-  rpm-packaging-arm64:
-    name: Build RPM Package - arm64
-    runs-on: ubuntu-20.04
-    timeout-minutes: 300
-
-    steps:
-      - name: checkout
-        uses: actions/checkout@v2
-
-      - name: Set up QEMU
-        uses: docker/setup-qemu-action@v1
-
-      - name: Package Pulsar source
-        run: build-support/generate-source-archive.sh
-
-      - uses: docker/setup-buildx-action@v2
-      - run: build-support/copy-deps-versionfile.sh
-
-      - name: Build dependencies Docker image
-        uses: docker/build-push-action@v3
-        with:
-          context: ./pkg/rpm
-          load: true
-          tags: build-rpm-arm64:latest
-          build-args: PLATFORM=aarch64
-          platforms: linux/arm64
-          cache-from: type=gha
-          cache-to: type=gha,mode=max
-
-      - name: Build RPM packages
-        run: pkg/rpm/docker-build-rpm-arm64.sh build-rpm-arm64:latest
-
-      - name: Upload artifacts
-        uses: actions/upload-artifact@v3
-        with:
-          name: rpm-arm64
-          path: pkg/rpm/RPMS
-
-  apk-packaging-x86_64:
-    name: Build Alpine Linux APK Package - x86_64
-    runs-on: ubuntu-20.04
-    timeout-minutes: 300
-
-    steps:
-      - name: checkout
-        uses: actions/checkout@v2
-
-      - name: Package Pulsar source
-        run: build-support/generate-source-archive.sh
-
-      - uses: docker/setup-buildx-action@v2
-      - run: build-support/copy-deps-versionfile.sh
-
-      - name: Build dependencies Docker image
-        uses: docker/build-push-action@v3
-        with:
-          context: ./pkg/apk
-          load: true
-          tags: build-apk-x86_64:latest
-          build-args: PLATFORM=x86_64
-          cache-from: type=gha
-          cache-to: type=gha,mode=max
-
-      - name: Build APK packages
-        run: pkg/apk/docker-build-apk-x86_64.sh build-apk-x86_64:latest
-
-      - name: Upload artifacts
-        uses: actions/upload-artifact@v3
-        with:
-          name: apk-x86_64
-          path: pkg/apk/build/x86_64
-
-  apk-packaging-arm64:
-    name: Build Alpine Linux APK Package - arm64
-    runs-on: ubuntu-20.04
-    timeout-minutes: 300
+  package:
+    name: Build ${{matrix.pkg.name}} ${{matrix.cpu.platform}}
+    runs-on: ubuntu-22.04
+    timeout-minutes: 500
+
+    strategy:
+      fail-fast: false
+      matrix:
+        pkg:
+          - { name: 'RPM', type: 'rpm', path: 'pkg/rpm/RPMS' }
+          - { name: 'Deb', type: 'deb', path: 'pkg/deb/BUILD/DEB' }
+          - { name: 'Alpine', type: 'apk', path: 'pkg/apk/build' }
+        cpu:
+          - { arch: 'x86_64', platform: 'x86_64' }
+          - { arch: 'aarch64', platform: 'arm64' }
 
     steps:
       - name: checkout
@@ -228,19 +61,19 @@ jobs:
       - name: Build dependencies Docker image
         uses: docker/build-push-action@v3
         with:
-          context: ./pkg/apk
+          context: ./pkg/${{matrix.pkg.type}}
           load: true
-          tags: build-apk-arm64:latest
-          build-args: PLATFORM=aarch64
-          platforms: linux/arm64
+          tags: build:latest
+          platforms: linux/${{matrix.cpu.platform}}
+          build-args: PLATFORM=${{matrix.cpu.arch}}
           cache-from: type=gha
           cache-to: type=gha,mode=max
 
-      - name: Build APK packages
-        run: pkg/apk/docker-build-apk-arm64.sh build-apk-arm64:latest
+      - name: Build packages
+        run: pkg/${{matrix.pkg.type}}/docker-build-${{matrix.pkg.type}}-${{matrix.cpu.platform}}.sh build:latest
 
       - name: Upload artifacts
         uses: actions/upload-artifact@v3
         with:
-          name: apk-arm64
-          path: pkg/apk/build/aarch64
+          name: ${{matrix.pkg.type}}-${{matrix.pkg.platform}}
+          path: ${{matrix.pkg.path}}
diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml
index 229e6fa..b4b9655 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -176,75 +176,28 @@ jobs:
             cmake --build ./build-1 --parallel --config Release
           fi
 
-  deb-packaging:
-    name: Build Debian Package
-    runs-on: ubuntu-20.04
+  package:
+    name: Build ${{matrix.pkg.name}} ${{matrix.cpu.platform}}
+    runs-on: ubuntu-22.04
     needs: unit-tests
-    timeout-minutes: 120
-
-    steps:
-      - name: checkout
-        uses: actions/checkout@v2
-
-      - name: Package Pulsar source
-        run: build-support/generate-source-archive.sh
-
-      - uses: docker/setup-buildx-action@v2
-      - run: build-support/copy-deps-versionfile.sh
+    timeout-minutes: 500
 
-      - name: Build dependencies Docker image
-        uses: docker/build-push-action@v3
-        with:
-          context: ./pkg/deb
-          load: true
-          tags: build-deb-x86_64:latest
-          build-args: PLATFORM=x86_64
-          cache-from: type=gha
-          cache-to: type=gha,mode=max
-
-      - name: Build Debian packages
-        run: pkg/deb/docker-build-deb-x86_64.sh build-deb-x86_64:latest
-
-
-  rpm-packaging:
-    name: Build RPM Package
-    runs-on: ubuntu-20.04
-    needs: unit-tests
-    timeout-minutes: 120
+    strategy:
+      fail-fast: true
+      matrix:
+        pkg:
+          - { name: 'RPM', type: 'rpm', path: 'pkg/rpm/RPMS' }
+          - { name: 'Deb', type: 'deb', path: 'pkg/deb/BUILD/DEB' }
+          - { name: 'Alpine', type: 'apk', path: 'pkg/apk/build' }
+        cpu:
+          - { arch: 'x86_64', platform: 'x86_64' }
 
     steps:
       - name: checkout
         uses: actions/checkout@v2
 
-      - name: Package Pulsar source
-        run: build-support/generate-source-archive.sh
-
-      - uses: docker/setup-buildx-action@v2
-      - run: build-support/copy-deps-versionfile.sh
-
-      - name: Build dependencies Docker image
-        uses: docker/build-push-action@v3
-        with:
-          context: ./pkg/rpm
-          load: true
-          tags: build-rpm-x86_64:latest
-          build-args: PLATFORM=x86_64
-          cache-from: type=gha
-          cache-to: type=gha,mode=max
-
-      - name: Build RPM packages
-        run: pkg/rpm/docker-build-rpm-x86_64.sh build-rpm-x86_64:latest
-
-
-  apk-packaging:
-    name: Build Alpine Linux APK Package
-    runs-on: ubuntu-20.04
-    needs: unit-tests
-    timeout-minutes: 120
-
-    steps:
-      - name: checkout
-        uses: actions/checkout@v2
+      - name: Set up QEMU
+        uses: docker/setup-qemu-action@v1
 
       - name: Package Pulsar source
         run: build-support/generate-source-archive.sh
@@ -255,12 +208,14 @@ jobs:
       - name: Build dependencies Docker image
         uses: docker/build-push-action@v3
         with:
-          context: ./pkg/apk
+          context: ./pkg/${{matrix.pkg.type}}
           load: true
-          tags: build-apk-x86_64:latest
-          build-args: PLATFORM=x86_64
+          tags: build:latest
+          platforms: linux/${{matrix.cpu.platform}}
+          build-args: PLATFORM=${{matrix.cpu.arch}}
           cache-from: type=gha
           cache-to: type=gha,mode=max
 
-      - name: Build APK packages
-        run: pkg/apk/docker-build-apk-x86_64.sh build-apk-x86_64:latest
+      - name: Build packages
+        run: pkg/${{matrix.pkg.type}}/docker-build-${{matrix.pkg.type}}-${{matrix.cpu.platform}}.sh build:latest
+


[pulsar-client-cpp] 05/06: [flaky-test] Fix very flaky tests for TEST_P (#59)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit b617a7f844548a12660c5f251cfbbffafef036b1
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Oct 21 11:55:13 2022 +0800

    [flaky-test] Fix very flaky tests for TEST_P (#59)
    
    Fixes #58 #24
    
    ### Motivation
    
    gtest-parallel runs tests in different processes, not threads. So the
    topic name could be the same even if it has the timestamp suffix. Then
    `ConsumerBusy` error would occur.
    
    ### Modifications
    
    In each `TEST_P` method, convert `GetParam()` to a unique string to
    avoid topic conflict.
---
 tests/ProducerTest.cc |  6 ++++--
 tests/ShutdownTest.cc | 54 +++++++++++++++++++++++++++++----------------------
 2 files changed, 35 insertions(+), 25 deletions(-)

diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index d351ee9..36b23ee 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -219,7 +219,8 @@ class ProducerTest : public ::testing::TestWithParam<bool> {};
 TEST_P(ProducerTest, testMaxMessageSize) {
     Client client(serviceUrl);
 
-    const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr));
+    const auto topic = std::string("ProducerTest-NoBatchMaxMessageSize-") +
+                       (GetParam() ? "batch-" : "-no-batch-") + std::to_string(time(nullptr));
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
@@ -247,7 +248,8 @@ TEST_P(ProducerTest, testMaxMessageSize) {
 TEST_P(ProducerTest, testChunkingMaxMessageSize) {
     Client client(serviceUrl);
 
-    const std::string topic = "ProducerTest-ChunkingMaxMessageSize-" + std::to_string(time(nullptr));
+    const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") +
+                       (GetParam() ? "batch-" : "no-batch-") + std::to_string(time(nullptr));
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
diff --git a/tests/ShutdownTest.cc b/tests/ShutdownTest.cc
index e32a95c..d9a9c23 100644
--- a/tests/ShutdownTest.cc
+++ b/tests/ShutdownTest.cc
@@ -35,60 +35,66 @@ enum class EndToEndType : uint8_t
     REGEX_TOPICS
 };
 
-class ShutdownTest : public ::testing::TestWithParam<EndToEndType> {
-   public:
-    void SetUp() override {
-        topic_ = topic_ + std::to_string(id_++) + "-" + std::to_string(time(nullptr));
-        if (GetParam() != EndToEndType::SINGLE_TOPIC) {
-            int res = makePutRequest(
-                "http://localhost:8080/admin/v2/persistent/public/default/" + topic_ + "/partitions", "2");
-            ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-        }
+static std::string toString(EndToEndType endToEndType) {
+    switch (endToEndType) {
+        case EndToEndType::SINGLE_TOPIC:
+            return "single-topic";
+        case EndToEndType::MULTI_TOPICS:
+            return "multi-topics";
+        case EndToEndType::REGEX_TOPICS:
+            return "regex-topics";
+        default:
+            return "???";
     }
+}
 
+class ShutdownTest : public ::testing::TestWithParam<EndToEndType> {
    protected:
     Client client_{lookupUrl};
     decltype(PulsarFriend::getProducers(client_)) producers_{PulsarFriend::getProducers(client_)};
     decltype(PulsarFriend::getConsumers(client_)) consumers_{PulsarFriend::getConsumers(client_)};
-    std::string topic_ = "shutdown-test-";
 
-    static std::atomic_int id_;
+    void createPartitionedTopic(const std::string& topic) {
+        if (GetParam() != EndToEndType::SINGLE_TOPIC) {
+            int res = makePutRequest(
+                "http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions", "2");
+            ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+        }
+    }
 
-    Result subscribe(Consumer &consumer) {
+    Result subscribe(Consumer& consumer, const std::string& topic) {
         if (GetParam() == EndToEndType::REGEX_TOPICS) {
             // NOTE: Currently the regex subscription requires the complete namespace prefix
-            return client_.subscribeWithRegex("persistent://public/default/" + topic_ + ".*", "sub",
-                                              consumer);
+            return client_.subscribeWithRegex("persistent://public/default/" + topic + ".*", "sub", consumer);
         } else {
-            return client_.subscribe(topic_, "sub", consumer);
+            return client_.subscribe(topic, "sub", consumer);
         }
     }
 
     void assertConnectionsEmpty() {
         auto connections = PulsarFriend::getConnections(client_);
-        for (const auto &cnx : PulsarFriend::getConnections(client_)) {
+        for (const auto& cnx : PulsarFriend::getConnections(client_)) {
             EXPECT_TRUE(PulsarFriend::getProducers(*cnx).empty());
             EXPECT_TRUE(PulsarFriend::getConsumers(*cnx).empty());
         }
     }
 };
 
-std::atomic_int ShutdownTest::id_{0};
-
 TEST_P(ShutdownTest, testClose) {
+    std::string topic = "shutdown-test-close-" + toString(GetParam()) + "-" + std::to_string(time(nullptr));
     Producer producer;
-    ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));
     EXPECT_EQ(producers_.size(), 1);
     ASSERT_EQ(ResultOk, producer.close());
     EXPECT_EQ(producers_.size(), 0);
 
     Consumer consumer;
-    ASSERT_EQ(ResultOk, subscribe(consumer));
+    ASSERT_EQ(ResultOk, subscribe(consumer, topic));
     EXPECT_EQ(consumers_.size(), 1);
     ASSERT_EQ(ResultOk, consumer.close());
     EXPECT_EQ(consumers_.size(), 0);
 
-    ASSERT_EQ(ResultOk, subscribe(consumer));
+    ASSERT_EQ(ResultOk, subscribe(consumer, topic));
     EXPECT_EQ(consumers_.size(), 1);
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
     EXPECT_EQ(consumers_.size(), 0);
@@ -98,16 +104,18 @@ TEST_P(ShutdownTest, testClose) {
 }
 
 TEST_P(ShutdownTest, testDestructor) {
+    std::string topic =
+        "shutdown-test-destructor-" + toString(GetParam()) + "-" + std::to_string(time(nullptr));
     {
         Producer producer;
-        ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+        ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));
         EXPECT_EQ(producers_.size(), 1);
     }
     EXPECT_EQ(producers_.size(), 0);
 
     {
         Consumer consumer;
-        ASSERT_EQ(ResultOk, subscribe(consumer));
+        ASSERT_EQ(ResultOk, subscribe(consumer, topic));
         EXPECT_EQ(consumers_.size(), 1);
     }
     EXPECT_EQ(consumers_.size(), 0);