You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/10/18 02:33:47 UTC

[pulsar-client-cpp] branch main updated: [feat] Consumer support batch receive messages. (#21)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f7653b  [feat] Consumer support batch receive messages. (#21)
7f7653b is described below

commit 7f7653b694996f38cf422d8bef24aa1fdfb21c59
Author: Baodi Shi <ba...@icloud.com>
AuthorDate: Tue Oct 18 10:33:41 2022 +0800

    [feat] Consumer support batch receive messages. (#21)
    
    ### Motivation
    https://github.com/apache/pulsar/issues/17140
    
    This PR has been reviewed in [pulsar repo](https://github.com/apache/pulsar/pull/17429).
    
    ### Modifications
    - Consumer support batch receives messages.
    - Abstract common implementation to `ConsumerImplBase`.
---
 include/pulsar/BatchReceivePolicy.h    |  90 ++++++++++++++++
 include/pulsar/Consumer.h              |  25 +++++
 include/pulsar/ConsumerConfiguration.h |  18 ++++
 lib/BatchReceivePolicy.cc              |  57 +++++++++++
 lib/BatchReceivePolicyImpl.h           |  29 ++++++
 lib/ClientImpl.h                       |   5 +-
 lib/Consumer.cc                        |  18 ++++
 lib/ConsumerConfiguration.cc           |   9 ++
 lib/ConsumerConfigurationImpl.h        |   1 +
 lib/ConsumerImpl.cc                    | 174 +++++++++++++++++--------------
 lib/ConsumerImpl.h                     |  15 +--
 lib/ConsumerImplBase.cc                | 141 +++++++++++++++++++++++++
 lib/ConsumerImplBase.h                 |  45 +++++++-
 lib/HandlerBase.h                      |   1 +
 lib/MessagesImpl.cc                    |  58 +++++++++++
 lib/MessagesImpl.h                     |  46 +++++++++
 lib/MultiTopicsConsumerImpl.cc         | 142 ++++++++++++++++++--------
 lib/MultiTopicsConsumerImpl.h          |  22 ++--
 tests/BasicEndToEndTest.cc             | 181 +++++++++++++++++++++++++++++++++
 tests/BatchReceivePolicyTest.cc        |  40 ++++++++
 tests/ConsumerConfigurationTest.cc     |   8 ++
 tests/MessagesImplTest.cc              |  61 +++++++++++
 22 files changed, 1037 insertions(+), 149 deletions(-)

diff --git a/include/pulsar/BatchReceivePolicy.h b/include/pulsar/BatchReceivePolicy.h
new file mode 100644
index 0000000..3c66da2
--- /dev/null
+++ b/include/pulsar/BatchReceivePolicy.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef BATCH_RECEIVE_POLICY_HPP_
+#define BATCH_RECEIVE_POLICY_HPP_
+
+#include <pulsar/defines.h>
+#include <memory>
+
+namespace pulsar {
+
+struct BatchReceivePolicyImpl;
+
+/**
+ * Configuration for message batch receive {@link Consumer#batchReceive()} {@link
+ * Consumer#batchReceiveAsync()}.
+ *
+ * <p>Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a
+ * timeout for waiting for enough messages for this batch.
+ *
+ * <p>A batch receive action is completed as long as any one of the
+ * conditions (the batch has enough number or size of messages, or the waiting timeout is passed) are met.
+ *
+ * <p>Examples:
+ * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it
+ * means {@link Consumer#batchReceive()} will always wait until there is enough messages.
+ * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it
+ * means {@link Consumer#batchReceive()} will wait for 100ms no matter whether there are enough messages.
+ *
+ * <p>Note:
+ * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout.
+ * Otherwise, {@link Messages} ingest {@link Message} will never end.
+ *
+ * @since 2.4.1
+ */
+class PULSAR_PUBLIC BatchReceivePolicy {
+   public:
+    /**
+     * Default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
+     */
+    BatchReceivePolicy();
+
+    /**
+     *
+     * @param maxNumMessage  Max num message, if less than 0, it means no limit.
+     * @param maxNumBytes Max num bytes, if less than 0, it means no limit.
+     * @param timeoutMs If less than 0, it means no limit.
+     */
+    BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs);
+
+    /**
+     * Get max time out ms.
+     *
+     * @return
+     */
+    long getTimeoutMs() const;
+
+    /**
+     * Get the maximum number of messages.
+     * @return
+     */
+    int getMaxNumMessages() const;
+
+    /**
+     * Get max num bytes.
+     * @return
+     */
+    long getMaxNumBytes() const;
+
+   private:
+    std::shared_ptr<BatchReceivePolicyImpl> impl_;
+};
+}  // namespace pulsar
+
+#endif /* BATCH_RECEIVE_POLICY_HPP_ */
diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h
index 6c0ab27..c7911b9 100644
--- a/include/pulsar/Consumer.h
+++ b/include/pulsar/Consumer.h
@@ -113,6 +113,31 @@ class PULSAR_PUBLIC Consumer {
      */
     void receiveAsync(ReceiveCallback callback);
 
+    /**
+     * Batch receiving messages.
+     *
+     * <p>This calls blocks until has enough messages or wait timeout, more details to see {@link
+     * BatchReceivePolicy}.
+     *
+     * @param msgs a non-const reference where the received messages will be copied
+     * @return ResultOk when a message is received
+     * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+     */
+    Result batchReceive(Messages& msgs);
+
+    /**
+     * Async Batch receiving messages.
+     * <p>
+     * Retrieves a message when it will be available and completes callback with received message.
+     * </p>
+     * <p>
+     * batchReceiveAsync() should be called subsequently once callback gets completed with received message.
+     * Else it creates <i> backlog of receive requests </i> in the application.
+     * </p>
+     * @param BatchReceiveCallback will be completed when messages are available.
+     */
+    void batchReceiveAsync(BatchReceiveCallback callback);
+
     /**
      * Acknowledge the reception of a single message.
      *
diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h
index 4347c3b..13d5cc0 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -31,6 +31,7 @@
 #include <pulsar/InitialPosition.h>
 #include <pulsar/KeySharedPolicy.h>
 #include <pulsar/ConsumerEventListener.h>
+#include "BatchReceivePolicy.h"
 
 namespace pulsar {
 
@@ -38,8 +39,10 @@ class Consumer;
 class PulsarWrapper;
 
 /// Callback definition for non-data operation
+typedef std::vector<Message> Messages;
 typedef std::function<void(Result result)> ResultCallback;
 typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
+typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
 typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
 
 /// Callback definition for MessageListener
@@ -378,6 +381,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     InitialPosition getSubscriptionInitialPosition() const;
 
+    /**
+     * Set batch receive policy.
+     *
+     * @param batchReceivePolicy the default is
+     * {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
+     */
+    void setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy);
+
+    /**
+     * Get batch receive policy.
+     *
+     * @return batch receive policy
+     */
+    const BatchReceivePolicy& getBatchReceivePolicy() const;
+
     /**
      * Set whether the subscription status should be replicated.
      * The default value is `false`.
diff --git a/lib/BatchReceivePolicy.cc b/lib/BatchReceivePolicy.cc
new file mode 100644
index 0000000..08aa368
--- /dev/null
+++ b/lib/BatchReceivePolicy.cc
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/BatchReceivePolicy.h>
+#include "BatchReceivePolicyImpl.h"
+#include "LogUtils.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+DECLARE_LOG_OBJECT()
+
+BatchReceivePolicy::BatchReceivePolicy() : BatchReceivePolicy(-1, 10 * 1024 * 1024, 100) {}
+
+BatchReceivePolicy::BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs)
+    : impl_(std::make_shared<BatchReceivePolicyImpl>()) {
+    if (maxNumMessage <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) {
+        throw std::invalid_argument(
+            "At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.");
+    }
+    if (maxNumMessage <= 0 && maxNumBytes <= 0) {
+        impl_->maxNumMessage = -1;
+        impl_->maxNumBytes = 10 * 1024 * 1024;
+        LOG_WARN(
+            "BatchReceivePolicy maxNumMessages and maxNumBytes is less than 0. Reset to default: "
+            "maxNumMessage(-1), maxNumBytes(10 * 1024 * 10)");
+    } else {
+        impl_->maxNumMessage = maxNumMessage;
+        impl_->maxNumBytes = maxNumBytes;
+    }
+    impl_->timeoutMs = timeoutMs;
+}
+
+long BatchReceivePolicy::getTimeoutMs() const { return impl_->timeoutMs; }
+
+int BatchReceivePolicy::getMaxNumMessages() const { return impl_->maxNumMessage; }
+
+long BatchReceivePolicy::getMaxNumBytes() const { return impl_->maxNumBytes; }
+
+}  // namespace pulsar
diff --git a/lib/BatchReceivePolicyImpl.h b/lib/BatchReceivePolicyImpl.h
new file mode 100644
index 0000000..e7ba431
--- /dev/null
+++ b/lib/BatchReceivePolicyImpl.h
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+namespace pulsar {
+
+struct BatchReceivePolicyImpl {
+    int maxNumMessage;
+    long maxNumBytes;
+    long timeoutMs;
+};
+
+}  // namespace pulsar
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 466461a..e8e7708 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -28,14 +28,12 @@
 #include <mutex>
 #include <lib/TopicName.h>
 #include "ProducerImplBase.h"
-#include "ConsumerImplBase.h"
 #include <atomic>
 #include <vector>
 #include "ServiceNameResolver.h"
 
 namespace pulsar {
 
-class ClientImpl;
 class PulsarFriend;
 typedef std::shared_ptr<ClientImpl> ClientImplPtr;
 typedef std::weak_ptr<ClientImpl> ClientImplWeakPtr;
@@ -44,6 +42,9 @@ class ReaderImpl;
 typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
 typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
 
+class ConsumerImplBase;
+typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
+
 std::string generateRandomName();
 
 class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
diff --git a/lib/Consumer.cc b/lib/Consumer.cc
index 5d16362..13fb9f4 100644
--- a/lib/Consumer.cc
+++ b/lib/Consumer.cc
@@ -82,6 +82,24 @@ void Consumer::receiveAsync(ReceiveCallback callback) {
     impl_->receiveAsync(callback);
 }
 
+Result Consumer::batchReceive(Messages& msgs) {
+    if (!impl_) {
+        return ResultConsumerNotInitialized;
+    }
+    Promise<Result, Messages> promise;
+    impl_->batchReceiveAsync(WaitForCallbackValue<Messages>(promise));
+    return promise.getFuture().get(msgs);
+}
+
+void Consumer::batchReceiveAsync(BatchReceiveCallback callback) {
+    if (!impl_) {
+        Messages msgs;
+        callback(ResultConsumerNotInitialized, msgs);
+        return;
+    }
+    impl_->batchReceiveAsync(callback);
+}
+
 Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); }
 
 Result Consumer::acknowledge(const MessageId& messageId) {
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index f9fe499..0705cca 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -19,6 +19,7 @@
 #include <lib/ConsumerConfigurationImpl.h>
 
 #include <stdexcept>
+#include <pulsar/ConsumerConfiguration.h>
 
 namespace pulsar {
 
@@ -267,4 +268,12 @@ ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool st
 
 bool ConsumerConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; }
 
+void ConsumerConfiguration::setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy) {
+    impl_->batchReceivePolicy = batchReceivePolicy;
+}
+
+const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
+    return impl_->batchReceivePolicy;
+}
+
 }  // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index cca83a3..444fedf 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -45,6 +45,7 @@ struct ConsumerConfigurationImpl {
     ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
     bool readCompacted{false};
     InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
+    BatchReceivePolicy batchReceivePolicy{};
     int patternAutoDiscoveryPeriod{60};
     bool replicateSubscriptionStateEnabled{false};
     std::map<std::string, std::string> properties;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 37fcd95..54e346f 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -18,6 +18,7 @@
  */
 #include "ConsumerImpl.h"
 #include "MessageImpl.h"
+#include "MessagesImpl.h"
 #include "Commands.h"
 #include "LogUtils.h"
 #include "TimeUtils.h"
@@ -43,7 +44,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
                            bool hasParent /* = false by default */,
                            const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
                            Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
-    : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
+    : ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
+                       listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
       waitingForZeroQueueSizeMessage(false),
       config_(conf),
       subscription_(subscriptionName),
@@ -85,13 +87,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
 
-    // Initialize listener executor.
-    if (listenerExecutor) {
-        listenerExecutor_ = listenerExecutor;
-    } else {
-        listenerExecutor_ = client->getListenerExecutorProvider()->get();
-    }
-
     // Setup stats reporter.
     unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
     if (statsIntervalInSeconds) {
@@ -145,12 +140,12 @@ const std::string& ConsumerImpl::getTopic() const { return topic_; }
 void ConsumerImpl::start() {
     HandlerBase::start();
 
-    // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
+    // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
     // constructor completed.
     if (TopicName::get(topic_)->isPersistent()) {
         if (config_.getAckGroupingTimeMs() > 0) {
             ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
-                client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
+                client_.lock(), get_shared_this_ptr(), consumerId_, config_.getAckGroupingTimeMs(),
                 config_.getAckGroupingMaxSize()));
         } else {
             ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
@@ -169,7 +164,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
 
     // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
     // sending the subscribe request.
-    cnx->registerConsumer(consumerId_, shared_from_this());
+    cnx->registerConsumer(consumerId_, get_shared_this_ptr());
 
     if (duringSeek_) {
         ackGroupingTrackerPtr_->flushAndClean();
@@ -195,13 +190,13 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
         config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
         config_.getKeySharedPolicy(), config_.getPriorityLevel());
     cnx->sendRequestWithId(cmd, requestId)
-        .addListener(
-            std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
+        .addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx,
+                               std::placeholders::_1));
 }
 
 void ConsumerImpl::connectionFailed(Result result) {
     // Keep a reference to ensure object is kept alive
-    ConsumerImplPtr ptr = shared_from_this();
+    auto ptr = get_shared_this_ptr();
 
     if (consumerCreatedPromise_.setFailed(result)) {
         state_ = Failed;
@@ -244,7 +239,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
                 sendFlowPermitsToBroker(cnx, 1);
             }
         }
-        consumerCreatedPromise_.setValue(shared_from_this());
+        consumerCreatedPromise_.setValue(get_shared_this_ptr());
     } else {
         if (result == ResultTimeout) {
             // Creating the consumer has timed out. We need to ensure the broker closes the consumer
@@ -257,12 +252,12 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
         if (consumerCreatedPromise_.isComplete()) {
             // Consumer had already been initially created, we need to retry connecting in any case
             LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
-            scheduleReconnection(shared_from_this());
+            scheduleReconnection(get_shared_this_ptr());
         } else {
             // Consumer was not yet created, retry to connect to broker if it's possible
             if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
                 LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result));
-                scheduleReconnection(shared_from_this());
+                scheduleReconnection(get_shared_this_ptr());
             } else {
                 LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
                 consumerCreatedPromise_.setFailed(result);
@@ -292,7 +287,7 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
         int requestId = client->newRequestId();
         SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
         cnx->sendRequestWithId(cmd, requestId)
-            .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, shared_from_this(),
+            .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, get_shared_this_ptr(),
                                    std::placeholders::_1, callback));
     } else {
         Result result = ResultNotConnected;
@@ -460,34 +455,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
                                 << startMessageId.value());
             return;
         }
-
-        Lock lock(pendingReceiveMutex_);
-        // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
-        bool asyncReceivedWaiting = !pendingReceives_.empty();
-        ReceiveCallback callback;
-        if (asyncReceivedWaiting) {
-            callback = pendingReceives_.front();
-            pendingReceives_.pop();
-        }
-        lock.unlock();
-
-        if (asyncReceivedWaiting) {
-            listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                                  shared_from_this(), ResultOk, m, callback));
-            return;
-        }
-
-        // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
-        if (config_.getReceiverQueueSize() != 0 ||
-            (config_.getReceiverQueueSize() == 0 && messageListener_)) {
-            incomingMessages_.push(m);
-        } else {
-            Lock lock(mutex_);
-            if (waitingForZeroQueueSizeMessage) {
-                lock.unlock();
-                incomingMessages_.push(m);
-            }
-        }
+        executeNotifyCallback(m);
     }
 
     if (messageListener_) {
@@ -496,7 +464,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         }
         // Trigger message listener callback in a separate thread
         while (numOfMessageReceived--) {
-            listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
+            listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
         }
     }
 }
@@ -504,16 +472,16 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
 void ConsumerImpl::activeConsumerChanged(bool isActive) {
     if (eventListener_) {
         listenerExecutor_->postWork(
-            std::bind(&ConsumerImpl::internalConsumerChangeListener, shared_from_this(), isActive));
+            std::bind(&ConsumerImpl::internalConsumerChangeListener, get_shared_this_ptr(), isActive));
     }
 }
 
 void ConsumerImpl::internalConsumerChangeListener(bool isActive) {
     try {
         if (isActive) {
-            eventListener_->becameActive(Consumer(shared_from_this()), partitionIndex_);
+            eventListener_->becameActive(Consumer(get_shared_this_ptr()), partitionIndex_);
         } else {
-            eventListener_->becameInactive(Consumer(shared_from_this()), partitionIndex_);
+            eventListener_->becameInactive(Consumer(get_shared_this_ptr()), partitionIndex_);
         }
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from event listener " << e.what());
@@ -527,11 +495,56 @@ void ConsumerImpl::failPendingReceiveCallback() {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
+                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
 
+void ConsumerImpl::executeNotifyCallback(Message& msg) {
+    Lock lock(pendingReceiveMutex_);
+    // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+    bool asyncReceivedWaiting = !pendingReceives_.empty();
+    ReceiveCallback callback;
+    if (asyncReceivedWaiting) {
+        callback = pendingReceives_.front();
+        pendingReceives_.pop();
+    }
+    lock.unlock();
+
+    // has pending receive, direct callback.
+    if (asyncReceivedWaiting) {
+        listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        return;
+    }
+
+    // try to add incoming messages.
+    // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
+    if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) {
+        incomingMessages_.push(msg);
+        incomingMessagesSize_.fetch_add(msg.getLength());
+    }
+
+    // try trigger pending batch messages
+    Lock batchOptionLock(batchReceiveOptionMutex_);
+    if (hasEnoughMessagesForBatchReceive()) {
+        ConsumerImplBase::notifyBatchPendingReceivedCallback();
+    }
+}
+
+void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+    auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+                                                   batchReceivePolicy_.getMaxNumBytes());
+    Message peekMsg;
+    while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
+        messageProcessed(peekMsg);
+        messages->add(peekMsg);
+    }
+    auto self = get_shared_this_ptr();
+    listenerExecutor_->postWork(
+        [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+}
+
 void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
                                                  const ReceiveCallback& callback) {
     if (result == ResultOk && config_.getReceiverQueueSize() != 0) {
@@ -573,19 +586,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
             }
         }
 
-        //
-        Lock lock(pendingReceiveMutex_);
-        if (!pendingReceives_.empty()) {
-            ReceiveCallback callback = pendingReceives_.front();
-            pendingReceives_.pop();
-            lock.unlock();
-            listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                                  shared_from_this(), ResultOk, msg, callback));
-        } else {
-            // Regular path, append individual message to incoming messages queue
-            incomingMessages_.push(msg);
-            lock.unlock();
-        }
+        executeNotifyCallback(msg);
     }
 
     if (skippedMessages > 0) {
@@ -698,7 +699,7 @@ void ConsumerImpl::internalListener() {
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
         lastDequedMessageId_ = msg.getMessageId();
-        messageListener_(Consumer(shared_from_this()), msg);
+        messageListener_(Consumer(get_shared_this_ptr()), msg);
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
     }
@@ -721,9 +722,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
             getName() << "The incoming message queue should never be greater than 0 when Queue size is 0");
         incomingMessages_.clear();
     }
-    Lock localLock(mutex_);
     waitingForZeroQueueSizeMessage = true;
-    localLock.unlock();
 
     sendFlowPermitsToBroker(currentCnx, 1);
 
@@ -745,7 +744,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
             }
         }
     }
-    return ResultOk;
 }
 
 Result ConsumerImpl::receive(Message& msg) {
@@ -837,6 +835,8 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
     lastDequedMessageId_ = msg.getMessageId();
     lock.unlock();
 
+    incomingMessagesSize_.fetch_sub(msg.getLength());
+
     ClientConnectionPtr currentCnx = getCnx().lock();
     if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
         LOG_DEBUG(getName() << "Not adding permit since connection is different.");
@@ -934,7 +934,7 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com
 }
 
 void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
-    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1,
+    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
                                   callback, proto::CommandAck_AckType_Individual);
     if (msgId.batchIndex() != -1 &&
         !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) {
@@ -945,7 +945,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
 }
 
 void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
-    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1,
+    ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
                                   callback, proto::CommandAck_AckType_Cumulative);
     if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
         cb(ResultCumulativeAcknowledgementNotAllowedError);
@@ -993,12 +993,12 @@ void ConsumerImpl::disconnectConsumer() {
     Lock lock(mutex_);
     connection_.reset();
     lock.unlock();
-    scheduleReconnection(shared_from_this());
+    scheduleReconnection(get_shared_this_ptr());
 }
 
 void ConsumerImpl::closeAsync(ResultCallback callback) {
     // Keep a reference to ensure object is kept alive
-    ConsumerImplPtr ptr = shared_from_this();
+    ConsumerImplPtr ptr = get_shared_this_ptr();
 
     if (state_ != Ready) {
         if (callback) {
@@ -1041,12 +1041,16 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
     if (callback) {
         // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
-        future.addListener(
-            std::bind(&ConsumerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr));
+        future.addListener(std::bind(&ConsumerImpl::handleClose, get_shared_this_ptr(), std::placeholders::_1,
+                                     callback, ptr));
     }
 
     // fail pendingReceive callback
     failPendingReceiveCallback();
+    failPendingBatchReceiveCallback();
+
+    // cancel timer
+    batchReceiveTimer_->cancel();
 }
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
@@ -1102,7 +1106,7 @@ Result ConsumerImpl::resumeMessageListener() {
 
     for (size_t i = 0; i < count; i++) {
         // Trigger message listener callback in a separate thread
-        listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
+        listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
     }
     // Check current permits and determine whether to send FLOW command
     this->increaseAvailablePermits(getCnx().lock(), 0);
@@ -1167,7 +1171,7 @@ void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callb
                                 << ", requestId - " << requestId);
 
             cnx->newConsumerStats(consumerId_, requestId)
-                .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(),
+                .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, get_shared_this_ptr(),
                                        std::placeholders::_1, std::placeholders::_2, callback));
             return;
         } else {
@@ -1303,7 +1307,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
             LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId()
                                 << ", requestId - " << requestId);
 
-            auto self = shared_from_this();
+            auto self = get_shared_this_ptr();
             cnx->newGetLastMessageId(consumerId_, requestId)
                 .addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) {
                     if (result == ResultOk) {
@@ -1384,7 +1388,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
         LOG_INFO(getName() << " Seeking subscription to " << seekId);
     }
 
-    std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this()};
+    std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
 
     cnx->sendRequestWithId(seek, requestId)
         .addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
@@ -1419,4 +1423,18 @@ bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
                                                : idx <= startMessageId_.get().value().entryId();
 }
 
+bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const {
+    if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) {
+        return false;
+    }
+    return (batchReceivePolicy_.getMaxNumMessages() > 0 &&
+            incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) ||
+           (batchReceivePolicy_.getMaxNumBytes() > 0 &&
+            incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes());
+}
+
+std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
+    return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this());
+}
+
 } /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 1ad3a4c..09d2c5c 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -65,9 +65,7 @@ enum ConsumerTopicType
     Partitioned
 };
 
-class ConsumerImpl : public ConsumerImplBase,
-                     public HandlerBase,
-                     public std::enable_shared_from_this<ConsumerImpl> {
+class ConsumerImpl : public ConsumerImplBase {
    public:
     ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
                  const ConsumerConfiguration&, bool isPersistent,
@@ -147,7 +145,10 @@ class ConsumerImpl : public ConsumerImplBase,
     // overrided methods from HandlerBase
     void connectionOpened(const ClientConnectionPtr& cnx) override;
     void connectionFailed(Result result) override;
-    HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
+
+    // impl methods from ConsumerImpl base
+    bool hasEnoughMessagesForBatchReceive() const override;
+    void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
 
     void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
 
@@ -159,7 +160,8 @@ class ConsumerImpl : public ConsumerImplBase,
     ConsumerStatsBasePtr consumerStatsBasePtr_;
 
    private:
-    bool waitingForZeroQueueSizeMessage;
+    std::atomic_bool waitingForZeroQueueSizeMessage;
+    std::shared_ptr<ConsumerImpl> get_shared_this_ptr();
     bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageIdData,
                                    const proto::MessageMetadata& metadata, SharedBuffer& payload,
                                    bool checkMaxMessageSize);
@@ -180,6 +182,7 @@ class ConsumerImpl : public ConsumerImplBase,
     Result receiveHelper(Message& msg);
     Result receiveHelper(Message& msg, int timeout);
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
+    void executeNotifyCallback(Message& msg);
     void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
     void failPendingReceiveCallback();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
@@ -199,13 +202,13 @@ class ConsumerImpl : public ConsumerImplBase,
     const bool isPersistent_;
     MessageListener messageListener_;
     ConsumerEventListenerPtr eventListener_;
-    ExecutorServicePtr listenerExecutor_;
     bool hasParent_;
     ConsumerTopicType consumerTopicType_;
 
     const Commands::SubscriptionMode subscriptionMode_;
 
     UnboundedBlockingQueue<Message> incomingMessages_;
+    std::atomic_int incomingMessagesSize_ = {0};
     std::queue<ReceiveCallback> pendingReceives_;
     std::atomic_int availablePermits_;
     const int receiverQueueRefillThreshold_;
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
new file mode 100644
index 0000000..4a8c027
--- /dev/null
+++ b/lib/ConsumerImplBase.cc
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs;
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            Lock batchOptionLock(batchReceiveOptionMutex_);
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchOptionLock.unlock();
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        listenerExecutor_->postWork(
+            [opBatchReceive]() { opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); });
+    }
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        lock.unlock();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    Lock batchOptionLock(batchReceiveOptionMutex_);
+    if (hasEnoughMessagesForBatchReceive()) {
+        notifyBatchPendingReceivedCallback(callback);
+        batchOptionLock.unlock();
+    } else {
+        OpBatchReceive opBatchReceive(callback);
+        Lock lock(batchPendingReceiveMutex_);
+        batchPendingReceives_.emplace(opBatchReceive);
+        lock.unlock();
+        triggerBatchReceiveTimerTask(batchReceivePolicy_.getTimeoutMs());
+    }
+}
+
+OpBatchReceive::OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback)
+    : batchReceiveCallback_(batchReceiveCallback), createAt_(TimeUtils::currentTimeMillis()) {}
+
+} /* namespace pulsar */
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 693d4da..18b8bc1 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -20,23 +20,38 @@
 #define PULSAR_CONSUMER_IMPL_BASE_HEADER
 #include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
-
+#include "HandlerBase.h"
+#include <queue>
 #include <set>
 
 namespace pulsar {
 class ConsumerImplBase;
+class HandlerBase;
 
 typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
 
-class ConsumerImplBase {
+class OpBatchReceive {
    public:
-    virtual ~ConsumerImplBase() {}
+    OpBatchReceive();
+    explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback);
+    const BatchReceiveCallback batchReceiveCallback_;
+    const int64_t createAt_;
+};
+
+class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this<ConsumerImplBase> {
+   public:
+    virtual ~ConsumerImplBase(){};
+    ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                     const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor);
+
+    // interface by consumer
     virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() = 0;
-    virtual const std::string& getSubscriptionName() const = 0;
     virtual const std::string& getTopic() const = 0;
+    virtual const std::string& getSubscriptionName() const = 0;
     virtual Result receive(Message& msg) = 0;
     virtual Result receive(Message& msg, int timeout) = 0;
     virtual void receiveAsync(ReceiveCallback& callback) = 0;
+    void batchReceiveAsync(BatchReceiveCallback callback);
     virtual void unsubscribeAsync(ResultCallback callback) = 0;
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0;
     virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
@@ -49,7 +64,6 @@ class ConsumerImplBase {
     virtual Result resumeMessageListener() = 0;
     virtual void redeliverUnacknowledgedMessages() = 0;
     virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
-    virtual const std::string& getName() const = 0;
     virtual int getNumOfPrefetchedMessages() const = 0;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
@@ -57,6 +71,27 @@ class ConsumerImplBase {
     virtual void negativeAcknowledge(const MessageId& msgId) = 0;
     virtual bool isConnected() const = 0;
     virtual uint64_t getNumberOfConnectedConsumer() = 0;
+    // overrided methods from HandlerBase
+    virtual const std::string& getName() const override = 0;
+
+   protected:
+    // overrided methods from HandlerBase
+    void connectionOpened(const ClientConnectionPtr& cnx) override {}
+    void connectionFailed(Result result) override {}
+    HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
+
+    // consumer impl generic method.
+    ExecutorServicePtr listenerExecutor_;
+    std::queue<OpBatchReceive> batchPendingReceives_;
+    BatchReceivePolicy batchReceivePolicy_;
+    DeadlineTimerPtr batchReceiveTimer_;
+    std::mutex batchReceiveOptionMutex_;
+    void triggerBatchReceiveTimerTask(long timeoutMs);
+    void doBatchReceiveTimeTask();
+    void failPendingBatchReceiveCallback();
+    void notifyBatchPendingReceivedCallback();
+    virtual void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) = 0;
+    virtual bool hasEnoughMessagesForBatchReceive() const = 0;
 
    private:
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 1184746..6fc3603 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -90,6 +90,7 @@ class HandlerBase {
     ExecutorServicePtr executor_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
+    std::mutex batchPendingReceiveMutex_;
     ptime creationTimestamp_;
 
     const TimeDuration operationTimeut_;
diff --git a/lib/MessagesImpl.cc b/lib/MessagesImpl.cc
new file mode 100644
index 0000000..7d45cdd
--- /dev/null
+++ b/lib/MessagesImpl.cc
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+    : maxNumberOfMessages_(maxNumberOfMessages),
+      maxSizeOfMessages_(maxSizeOfMessages),
+      currentSizeOfMessages_(0) {}
+
+const std::vector<Message>& MessagesImpl::getMessageList() const { return messageList_; }
+
+bool MessagesImpl::canAdd(const Message& message) const {
+    if (messageList_.size() == 0) {
+        return true;
+    }
+
+    if (maxNumberOfMessages_ > 0 && messageList_.size() + 1 > maxNumberOfMessages_) {
+        return false;
+    }
+
+    if (maxSizeOfMessages_ > 0 && currentSizeOfMessages_ + message.getLength() > maxSizeOfMessages_) {
+        return false;
+    }
+
+    return true;
+}
+
+void MessagesImpl::add(const Message& message) {
+    if (!canAdd(message)) {
+        throw std::invalid_argument("No more space to add messages.");
+    }
+    currentSizeOfMessages_ += message.getLength();
+    messageList_.emplace_back(message);
+}
+
+int MessagesImpl::size() const { return messageList_.size(); }
+
+void MessagesImpl::clear() {
+    currentSizeOfMessages_ = 0;
+    messageList_.clear();
+}
diff --git a/lib/MessagesImpl.h b/lib/MessagesImpl.h
new file mode 100644
index 0000000..0c12768
--- /dev/null
+++ b/lib/MessagesImpl.h
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_MESSAGESIMPL_H
+#define PULSAR_CPP_MESSAGESIMPL_H
+
+#include <vector>
+#include <pulsar/Message.h>
+
+using namespace pulsar;
+
+namespace pulsar {
+
+class MessagesImpl {
+   public:
+    MessagesImpl(const int maxNumberOfMessages, const long maxSizeOfMessages);
+    const std::vector<Message>& getMessageList() const;
+    bool canAdd(const Message& message) const;
+    void add(const Message& message);
+    int size() const;
+    void clear();
+
+   private:
+    std::vector<Message> messageList_;
+    const int maxNumberOfMessages_;
+    const long maxSizeOfMessages_;
+    long currentSizeOfMessages_;
+};
+
+}  // namespace pulsar
+#endif  // PULSAR_CPP_MESSAGESIMPL_H
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 0d730e1..573c33d 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -18,6 +18,7 @@
  */
 #include "MultiTopicsConsumerImpl.h"
 #include "MultiResultCallback.h"
+#include "MessagesImpl.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -27,12 +28,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
                                                  const std::string& subscriptionName, TopicNamePtr topicName,
                                                  const ConsumerConfiguration& conf,
                                                  LookupServicePtr lookupServicePtr)
-    : client_(client),
+    : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
+                       Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
+                       client->getListenerExecutorProvider()->get()),
+      client_(client),
       subscriptionName_(subscriptionName),
-      topic_(topicName ? topicName->toString() : "EmptyTopics"),
       conf_(conf),
-      messages_(conf.getReceiverQueueSize()),
-      listenerExecutor_(client->getListenerExecutorProvider()->get()),
+      incomingMessages_(conf.getReceiverQueueSize()),
       messageListener_(conf.getMessageListener()),
       lookupServicePtr_(lookupServicePtr),
       numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
@@ -59,14 +61,16 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
         partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
         lookupServicePtr_ = client_->getLookup();
     }
+
+    state_ = Pending;
 }
 
 void MultiTopicsConsumerImpl::start() {
     if (topics_.empty()) {
-        MultiTopicsConsumerState state = Pending;
+        State state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
-            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+            multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
             return;
         } else {
             LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << state_);
@@ -81,7 +85,7 @@ void MultiTopicsConsumerImpl::start() {
     // subscribe for each passed in topic
     for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
         subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed,
-                                                           shared_from_this(), std::placeholders::_1,
+                                                           get_shared_this_ptr(), std::placeholders::_1,
                                                            std::placeholders::_2, *itr, topicsNeedCreate));
     }
 }
@@ -100,10 +104,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     }
 
     if (--(*topicsNeedCreate) == 0) {
-        MultiTopicsConsumerState state = Pending;
+        State state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
-            multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+            multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned consumers
@@ -162,7 +166,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
     ConsumerConfiguration config = conf_.clone();
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
 
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
+    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
                                         std::placeholders::_1, std::placeholders::_2));
 
     int partitions = numPartitions == 0 ? 1 : numPartitions;
@@ -186,8 +190,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
                                                   topicName->isPersistent(), internalListenerExecutor, true,
                                                   NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(std::bind(
-            &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1,
-            std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
+            &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
+            std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
         consumers_.emplace(topicName->toString(), consumer);
         LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
         consumer->start();
@@ -199,7 +203,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
                                                       topicName->isPersistent(), internalListenerExecutor,
                                                       true, Partitioned);
             consumer->getConsumerCreatedFuture().addListener(std::bind(
-                &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
+                &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
                 std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
             consumer->setPartitionIndex(i);
             consumers_.emplace(topicPartitionName, consumer);
@@ -236,7 +240,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
         if (partitionsUpdateTimer_) {
             runPartitionUpdateTask();
         }
-        topicSubResultPromise->setValue(Consumer(shared_from_this()));
+        topicSubResultPromise->setValue(Consumer(get_shared_this_ptr()));
     }
 }
 
@@ -252,7 +256,7 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     state_ = Closing;
 
     std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
-    auto self = shared_from_this();
+    auto self = get_shared_this_ptr();
     int numConsumers = 0;
     consumers_.forEachValue(
         [&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
@@ -329,7 +333,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
         }
 
         optConsumer.value()->unsubscribeAsync(
-            std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, shared_from_this(),
+            std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, get_shared_this_ptr(),
                       std::placeholders::_1, consumerUnsubed, numberPartitions, topicName, topicPartitionName,
                       callback));
     }
@@ -385,7 +389,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     state_ = Closing;
 
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
     int numConsumers = 0;
     consumers_.clear(
         [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
@@ -414,7 +418,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
                 }
                 // closed all consumers
                 if (numConsumersLeft == 0) {
-                    messages_.clear();
+                    incomingMessages_.clear();
                     topicsPartitions_.clear();
                     unAckedMessageTrackerPtr_->clear();
 
@@ -440,6 +444,10 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     // fail pending receive
     failPendingReceiveCallback();
+    failPendingBatchReceiveCallback();
+
+    // cancel timer
+    batchReceiveTimer_->cancel();
 }
 
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
@@ -454,25 +462,39 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
         pendingReceives_.pop();
         lock.unlock();
         listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultOk, msg, callback));
-    } else {
-        if (messages_.full()) {
-            lock.unlock();
-        }
+                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        return;
+    }
 
-        if (messages_.push(msg) && messageListener_) {
-            listenerExecutor_->postWork(
-                std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
-        }
+    if (incomingMessages_.full()) {
+        lock.unlock();
+    }
+
+    // add message to block queue.
+    // when messages queue is full, will block listener thread on ConsumerImpl,
+    // then will not send permits to broker, will broker stop push message.
+    incomingMessages_.push(msg);
+    incomingMessagesSize_.fetch_add(msg.getLength());
+
+    // try trigger pending batch messages
+    Lock batchOptionLock(batchReceiveOptionMutex_);
+    if (hasEnoughMessagesForBatchReceive()) {
+        ConsumerImplBase::notifyBatchPendingReceivedCallback();
+    }
+    batchOptionLock.unlock();
+
+    if (messageListener_) {
+        listenerExecutor_->postWork(
+            std::bind(&MultiTopicsConsumerImpl::internalListener, get_shared_this_ptr(), consumer));
     }
 }
 
 void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
     Message m;
-    messages_.pop(m);
-    unAckedMessageTrackerPtr_->add(m.getMessageId());
+    incomingMessages_.pop(m);
     try {
-        messageListener_(Consumer(shared_from_this()), m);
+        messageListener_(Consumer(get_shared_this_ptr()), m);
+        messageProcessed(m);
     } catch (const std::exception& e) {
         LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
     }
@@ -487,9 +509,9 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
-    messages_.pop(msg);
+    incomingMessages_.pop(msg);
+    messageProcessed(msg);
 
-    unAckedMessageTrackerPtr_->add(msg.getMessageId());
     return ResultOk;
 }
 
@@ -503,8 +525,8 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
         return ResultInvalidConfiguration;
     }
 
-    if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
+        messageProcessed(msg);
         return ResultOk;
     } else {
         if (state_ != Ready) {
@@ -524,9 +546,9 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     }
 
     Lock lock(pendingReceiveMutex_);
-    if (messages_.pop(msg, std::chrono::milliseconds(0))) {
+    if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
         lock.unlock();
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        messageProcessed(msg);
         callback(ResultOk, msg);
     } else {
         pendingReceives_.push(callback);
@@ -536,14 +558,14 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     Message msg;
 
-    messages_.close();
+    incomingMessages_.close();
 
     Lock lock(pendingReceiveMutex_);
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
+                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
@@ -649,7 +671,7 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
     });
 }
 
-int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
+int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
 
 void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
     if (state_ != Ready) {
@@ -662,7 +684,7 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
     lock.unlock();
 
-    auto self = shared_from_this();
+    auto self = get_shared_this_ptr();
     size_t i = 0;
     consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
         size_t index = i++;
@@ -750,7 +772,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
 }
 void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
     partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
     partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
         // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
         // cannot continue at this time, and the request needs to be ignored.
@@ -769,7 +791,7 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
         auto topicName = TopicName::get(item.first);
         auto currentNumPartitions = item.second;
         lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
-            std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, shared_from_this(), topicName,
+            std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, get_shared_this_ptr(), topicName,
                       std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
     }
 }
@@ -810,7 +832,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
     std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
     ConsumerConfiguration config = conf_.clone();
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
-    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
+    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
                                         std::placeholders::_1, std::placeholders::_2));
 
     // Apply total limit of receiver queue size across partitions
@@ -824,7 +846,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
                                                    topicName->isPersistent(), internalListenerExecutor, true,
                                                    Partitioned);
     consumer->getConsumerCreatedFuture().addListener(
-        std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
+        std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
                   std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
     consumer->setPartitionIndex(partitionIndex);
     consumer->start();
@@ -832,3 +854,35 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
     LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
                                             << " consumerSize: " << consumers_.size());
 }
+
+bool MultiTopicsConsumerImpl::hasEnoughMessagesForBatchReceive() const {
+    if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) {
+        return false;
+    }
+    return (batchReceivePolicy_.getMaxNumMessages() > 0 &&
+            incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) ||
+           (batchReceivePolicy_.getMaxNumBytes() > 0 &&
+            incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes());
+}
+
+void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+    auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+                                                   batchReceivePolicy_.getMaxNumBytes());
+    Message peekMsg;
+    while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
+        messageProcessed(peekMsg);
+        messages->add(peekMsg);
+    }
+    auto self = get_shared_this_ptr();
+    listenerExecutor_->postWork(
+        [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+}
+
+void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
+    incomingMessagesSize_.fetch_sub(msg.getLength());
+    unAckedMessageTrackerPtr_->add(msg.getMessageId());
+}
+
+std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
+    return std::dynamic_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 8769d59..044f417 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -38,17 +38,8 @@ namespace pulsar {
 typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
 
 class MultiTopicsConsumerImpl;
-class MultiTopicsConsumerImpl : public ConsumerImplBase,
-                                public std::enable_shared_from_this<MultiTopicsConsumerImpl> {
+class MultiTopicsConsumerImpl : public ConsumerImplBase {
    public:
-    enum MultiTopicsConsumerState
-    {
-        Pending,
-        Ready,
-        Closing,
-        Closed,
-        Failed
-    };
     MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                             const std::string& subscriptionName, TopicNamePtr topicName,
                             const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
@@ -99,16 +90,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     const ClientImplPtr client_;
     const std::string subscriptionName_;
     std::string consumerStr_;
-    std::string topic_;
     const ConsumerConfiguration conf_;
     typedef SynchronizedHashMap<std::string, ConsumerImplPtr> ConsumerMap;
     ConsumerMap consumers_;
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    std::atomic<MultiTopicsConsumerState> state_{Pending};
-    BlockingQueue<Message> messages_;
-    const ExecutorServicePtr listenerExecutor_;
+    BlockingQueue<Message> incomingMessages_;
+    std::atomic_int incomingMessagesSize_ = {0};
     MessageListener messageListener_;
     DeadlineTimerPtr partitionsUpdateTimer_;
     boost::posix_time::time_duration partitionsUpdateInterval_;
@@ -125,6 +114,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
                                               unsigned int partitionIndex);
     void notifyResult(CloseCallback closeCallback);
     void messageReceived(Consumer consumer, const Message& msg);
+    void messageProcessed(Message& msg);
     void internalListener(Consumer consumer);
     void receiveMessages();
     void failPendingReceiveCallback();
@@ -149,8 +139,12 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex,
                                     ConsumerSubResultPromisePtr topicSubResultPromise,
                                     std::shared_ptr<std::atomic<int>> partitionsNeedCreate);
+    // impl consumer base virtual method
+    bool hasEnoughMessagesForBatchReceive() const override;
+    void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
 
    private:
+    std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
 
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 54a07bb..d3e424e 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -4098,3 +4098,184 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+    }
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeout) { testBatchReceiveTimeout(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { testBatchReceiveTimeout(true); }
+
+void testBatchReceiveClose(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-close" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-close" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultAlreadyClosed);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    consumer.close();
+    client.close();
+
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }
diff --git a/tests/BatchReceivePolicyTest.cc b/tests/BatchReceivePolicyTest.cc
new file mode 100644
index 0000000..ab9ffcc
--- /dev/null
+++ b/tests/BatchReceivePolicyTest.cc
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/BatchReceivePolicy.h>
+
+using namespace pulsar;
+
+TEST(BatchReceivePolicyTest, testBatchReceivePolicy) {
+    ASSERT_THROW(BatchReceivePolicy(-1, -1, -1), std::invalid_argument);
+
+    {
+        BatchReceivePolicy batchReceivePolicy;
+        ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1);
+        ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024);
+        ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 100);
+    }
+
+    {
+        BatchReceivePolicy batchReceivePolicy(-1, -1, 123);
+        ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1);
+        ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024);
+        ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 123);
+    }
+}
diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc
index 24f541b..20cd8f4 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -61,6 +61,9 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
     ASSERT_EQ(conf.getPriorityLevel(), 0);
     ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 10);
     ASSERT_EQ(conf.isAutoAckOldestChunkedMessageOnQueueFull(), false);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), -1);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10 * 1024 * 1024);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
 }
 
 TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -151,6 +154,11 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
 
     conf.setAutoAckOldestChunkedMessageOnQueueFull(true);
     ASSERT_TRUE(conf.isAutoAckOldestChunkedMessageOnQueueFull());
+
+    conf.setBatchReceivePolicy(BatchReceivePolicy(10, 10, 100));
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), 10);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10);
+    ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
 }
 
 TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
diff --git a/tests/MessagesImplTest.cc b/tests/MessagesImplTest.cc
new file mode 100644
index 0000000..e963501
--- /dev/null
+++ b/tests/MessagesImplTest.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <MessagesImpl.h>
+#include "pulsar/MessageBuilder.h"
+
+using namespace pulsar;
+
+TEST(MessagesImplTest, testMessage) {
+    // 0. test not limits
+    {
+        MessagesImpl messages(-1, -1);
+        ASSERT_TRUE(messages.canAdd(Message()));
+    }
+
+    // 1. test max number of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(10, -1);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        ASSERT_THROW(messages.add(msg), std::invalid_argument);
+        messages.clear();
+        ASSERT_TRUE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 0);
+    }
+
+    // 2. test max size of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(-1, 10);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        ASSERT_THROW(messages.add(msg), std::invalid_argument);
+        messages.clear();
+        ASSERT_TRUE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 0);
+    }
+}