You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/07 11:26:49 UTC

[GitHub] [pulsar-client-cpp] shibd opened a new pull request, #21: [feat][cpp] Consumer support batch receive messages.

shibd opened a new pull request, #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21

   ### Motivation
   https://github.com/apache/pulsar/issues/17140
   
   ### Modifications
   - Consumer support batch receives messages.
   - Abstract common implementation to `ConsumerImplBase`.
   
   ### Documentation
   
   - [x] `doc` 
   
   ### Matching PR in forked repository
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992306708


##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });

Review Comment:
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#issuecomment-1272316750

   This PR ci has passed. https://github.com/shibd/pulsar-client-cpp/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r991979143


##########
lib/ConsumerImplBase.h:
##########
@@ -20,23 +20,38 @@
 #define PULSAR_CONSUMER_IMPL_BASE_HEADER
 #include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
-
+#include "HandlerBase.h"
+#include <queue>
 #include <set>
 
 namespace pulsar {
 class ConsumerImplBase;
+class HandlerBase;
 
 typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
 
-class ConsumerImplBase {
+class OpBatchReceive {
    public:
-    virtual ~ConsumerImplBase() {}
+    OpBatchReceive();
+    explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback);
+    const BatchReceiveCallback batchReceiveCallback_;
+    const long createAt_;

Review Comment:
   ```suggestion
       const int64_t createAt_;
   ```
   
   To make the type definition consistent with what `TimeUtils::currentTimeMillis` returns.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    if (hasEnoughMessagesForBatchReceive()) {
+        Lock lock(batchPendingReceiveMutex_);
+        notifyBatchPendingReceivedCallback(callback);
+        lock.unlock();

Review Comment:
   ```suggestion
           notifyBatchPendingReceivedCallback(callback);
   ```
   
   IIUC, `batchPendingReceiveMutex_` is used to make the accesses to `batchPendingReceives_` thread safe. We don't need the lock here.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    if (hasEnoughMessagesForBatchReceive()) {
+        Lock lock(batchPendingReceiveMutex_);
+        notifyBatchPendingReceivedCallback(callback);
+        lock.unlock();
+    } else {
+        // expectmoreIncomingMessages();

Review Comment:
   ```suggestion
   ```



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);

Review Comment:
   ```suggestion
   ```



##########
lib/MessagesImpl.cc:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+    : maxNumberOfMessages_(maxNumberOfMessages),
+      maxSizeOfMessages_(maxSizeOfMessages),
+      currentNumberOfMessages_(0),
+      currentSizeOfMessages_(0) {
+    messageList_ = std::vector<Message>();
+}
+
+const std::vector<Message>& MessagesImpl::getMessageList() const { return messageList_; }
+
+bool MessagesImpl::canAdd(const Message& message) const {
+    if (currentNumberOfMessages_ == 0) {
+        return true;
+    }
+
+    if (maxNumberOfMessages_ > 0 && currentNumberOfMessages_ + 1 > maxNumberOfMessages_) {
+        return false;
+    }
+
+    if (maxSizeOfMessages_ > 0 && currentSizeOfMessages_ + message.getLength() > maxSizeOfMessages_) {
+        return false;
+    }
+
+    return true;
+}
+
+void MessagesImpl::add(const Message& message) {
+    if (!canAdd(message)) {
+        throw std::invalid_argument("No more space to add messages.");
+    }
+    currentNumberOfMessages_++;

Review Comment:
   Though Java implementation adds this field to represent the size of `messagesList`. I think it's redundant. Could you remove this field and use `messagesList_.size()` instead?



##########
tests/MessagesImplTest.cc:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <MessagesImpl.h>
+#include "pulsar/MessageBuilder.h"
+
+using namespace pulsar;
+
+TEST(MessagesImplTest, testMessage) {
+    // 0. test not limits
+    {
+        MessagesImpl messages(-1, -1);
+        ASSERT_TRUE(messages.canAdd(Message()));
+    }
+
+    // 1. test max number of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(10, -1);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        try {
+            messages.add(msg);
+            FAIL() << "Should be failed.";
+        } catch (std::invalid_argument& e) {
+        }
+
+        messages.clear();
+        ASSERT_TRUE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 0);
+    }
+
+    // 2. test max size of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(-1, 10);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        try {
+            messages.add(msg);
+            FAIL() << "Should be failed.";
+        } catch (std::invalid_argument& e) {
+        }

Review Comment:
   Use `ASSERT_THROW`



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeout) { testBatchReceiveTimeout(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { testBatchReceiveTimeout(true); }
+
+void testBatchReceiveClose(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-close" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-close" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");

Review Comment:
   Could you explain why did you set these properties?



##########
lib/ConsumerImpl.cc:
##########
@@ -527,11 +495,58 @@ void ConsumerImpl::failPendingReceiveCallback() {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
+                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
 
+void ConsumerImpl::executeNotifyCallback(Message& msg) {
+    Lock lock(pendingReceiveMutex_);
+    // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+    bool asyncReceivedWaiting = !pendingReceives_.empty();
+    ReceiveCallback callback;
+    if (asyncReceivedWaiting) {
+        callback = pendingReceives_.front();
+        pendingReceives_.pop();
+    }
+    lock.unlock();
+
+    // has pending receive, direct callback.
+    if (asyncReceivedWaiting) {
+        listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        return;
+    }
+
+    // try to add incoming messages.
+    // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
+    if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) {
+        incomingMessages_.push(msg);
+        incomingMessagesSize_.fetch_add(msg.getLength());
+    }
+
+    // try trigger pending batch messages
+    if (hasEnoughMessagesForBatchReceive()) {
+        ConsumerImplBase::notifyBatchPendingReceivedCallback();
+    }
+}
+
+void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+    auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+                                                   batchReceivePolicy_.getMaxNumBytes());
+    Message peekMsg;
+    while (incomingMessages_.peek(peekMsg) && messages->canAdd(peekMsg)) {
+        // decreaseIncomingMessageSize
+        Message msg;
+        incomingMessages_.pop(msg);
+        messageProcessed(msg);
+        messages->add(msg);

Review Comment:
   ```suggestion
       while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
           messageProcessed(peekMsg);
           messages->add(peekMsg);
       }
   ```
   
   The combination of `peek` and `pop` is not atomic, we should use `pop` with zero timeout like what we did in `ConsumerImpl::receiveAsync`.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });

Review Comment:
   There is no need to capture `self` because the lambda's body never accesses any field or method of `ConsumerImplBase`. Declare a default constructed `Messages` object and capture it is also weird. And we don't need to unlock explicitly.
   
   ```c++
       Lock lock(batchPendingReceiveMutex_);
       while (!batchPendingReceives_.empty()) {
           OpBatchReceive opBatchReceive = batchPendingReceives_.front();
           batchPendingReceives_.pop();
           listenerExecutor_->postWork([opBatchReceive]() {
               opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {});
           });
       }
   ```



##########
lib/MessagesImpl.cc:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+    : maxNumberOfMessages_(maxNumberOfMessages),
+      maxSizeOfMessages_(maxSizeOfMessages),
+      currentNumberOfMessages_(0),
+      currentSizeOfMessages_(0) {
+    messageList_ = std::vector<Message>();
+}

Review Comment:
   ```suggestion
         currentSizeOfMessages_(0) {}
   ```
   
   The default constructor of `std::vector` creates an empty vector, it's redundant.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);

Review Comment:
   ```suggestion
           lock.unlock();
           notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
   ```



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.

Review Comment:
   ```suggestion
   ```
   
   These comments seem to be copied from somewhere else.



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);

Review Comment:
   ```suggestion
   ```
   
   debug logs in tests are meaningless



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);

Review Comment:
   ```suggestion
   ```



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");

Review Comment:
   The same question as I asked before.



##########
tests/MessagesImplTest.cc:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <MessagesImpl.h>
+#include "pulsar/MessageBuilder.h"
+
+using namespace pulsar;
+
+TEST(MessagesImplTest, testMessage) {
+    // 0. test not limits
+    {
+        MessagesImpl messages(-1, -1);
+        ASSERT_TRUE(messages.canAdd(Message()));
+    }
+
+    // 1. test max number of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(10, -1);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        try {
+            messages.add(msg);
+            FAIL() << "Should be failed.";
+        } catch (std::invalid_argument& e) {
+        }

Review Comment:
   Use `ASSERT_THROW`, see `KeySharedPolicyTest` as example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992294010


##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.

Review Comment:
   I want to express the need to judge `!ec`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992310766


##########
lib/ConsumerImpl.cc:
##########
@@ -527,11 +495,58 @@ void ConsumerImpl::failPendingReceiveCallback() {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
+                                              get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
 
+void ConsumerImpl::executeNotifyCallback(Message& msg) {
+    Lock lock(pendingReceiveMutex_);
+    // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+    bool asyncReceivedWaiting = !pendingReceives_.empty();
+    ReceiveCallback callback;
+    if (asyncReceivedWaiting) {
+        callback = pendingReceives_.front();
+        pendingReceives_.pop();
+    }
+    lock.unlock();
+
+    // has pending receive, direct callback.
+    if (asyncReceivedWaiting) {
+        listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                              get_shared_this_ptr(), ResultOk, msg, callback));
+        return;
+    }
+
+    // try to add incoming messages.
+    // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
+    if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) {
+        incomingMessages_.push(msg);
+        incomingMessagesSize_.fetch_add(msg.getLength());
+    }
+
+    // try trigger pending batch messages
+    if (hasEnoughMessagesForBatchReceive()) {
+        ConsumerImplBase::notifyBatchPendingReceivedCallback();
+    }
+}
+
+void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+    auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+                                                   batchReceivePolicy_.getMaxNumBytes());
+    Message peekMsg;
+    while (incomingMessages_.peek(peekMsg) && messages->canAdd(peekMsg)) {
+        // decreaseIncomingMessageSize
+        Message msg;
+        incomingMessages_.pop(msg);
+        messageProcessed(msg);
+        messages->add(msg);

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r993125648


##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.

Review Comment:
   Actually the pattern is widely used in code.
   
   ```c++
   async_wait([](const boost::system::error_code& ec) { if (!ec) { /* do sth... */ } });
   ```
   
   Both adding or not adding the comments are okay. However, if you want to add a comment, please make sure it's clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r991019314


##########
lib/ConsumerImpl.h:
##########
@@ -159,7 +160,8 @@ class ConsumerImpl : public ConsumerImplBase,
     ConsumerStatsBasePtr consumerStatsBasePtr_;
 
    private:
-    bool waitingForZeroQueueSizeMessage;
+    volatile bool waitingForZeroQueueSizeMessage;

Review Comment:
   Thank your reminder, Here is to ensure visibility on the multi-thread. I consulted the information and `volatile ` in C++ is not guaranteed.
   
   I replaced it with `atomic_bool`, PTAL.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992403519


##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    if (hasEnoughMessagesForBatchReceive()) {
+        Lock lock(batchPendingReceiveMutex_);
+        notifyBatchPendingReceivedCallback(callback);
+        lock.unlock();

Review Comment:
   Yes, you are right.  `batchPendingReceiveMutex_` is only used to make the accesses to `batchPendingReceives_` thread safe. I mixed it up.
   
   Here need a lock to make `hasEnoughMessagesForBatchReceive` and `notifyBatchPendingReceivedCallback` is atomic, then prevents users receive not compliant with the policy messages.
   
   I add a new `batchReceiveOptionMutex_` lock to sync them. 
   
   But, This is not entirely guaranteed, Because I didn't synchronize the scenario where a single receive message is `incomingQueue_` out of the queue.
   
   In Java client impl, These operations are all performed in `internalPinnedExecutor` threads. So no problem.
   
   I think it can be done this way first, and then I'll try to refactor it so that C++ also uses an `internal thread` to avoid adding various locks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower merged pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r996857527


##########
lib/ConsumerImplBase.h:
##########
@@ -20,23 +20,38 @@
 #define PULSAR_CONSUMER_IMPL_BASE_HEADER
 #include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
-
+#include "HandlerBase.h"
+#include <queue>
 #include <set>
 
 namespace pulsar {
 class ConsumerImplBase;
+class HandlerBase;
 
 typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
 
-class ConsumerImplBase {
+class OpBatchReceive {
    public:
-    virtual ~ConsumerImplBase() {}
+    OpBatchReceive();
+    explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback);
+    const BatchReceiveCallback batchReceiveCallback_;
+    const int64_t createAt_;
+};
+
+class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this<ConsumerImplBase> {

Review Comment:
   Just now I found another problem, which is caused by the multiple inheritance. Though it's not caused by your PR, it's a coding problem from early times.
   
   IMO, the `HandlerBase` should inherit `enable_shared_from_this`, see
   
   https://github.com/apache/pulsar-client-cpp/blob/c1a98084ba6c41c8b8e4f822b0224a25d173f258/lib/HandlerBase.cc#L86
   
   and
   
   https://github.com/apache/pulsar-client-cpp/blob/c1a98084ba6c41c8b8e4f822b0224a25d173f258/lib/HandlerBase.cc#L141
   
   Obviously, it's against the OOP rule. However, since `HandlerBase` doesn't inherit `enable_shared_from_this`, there is no way to determine if the `HandlerBase` object is valid in the callback of its timer. That's because the derived classes (`ProducerImpl`, `ConsumerImpl`, etc.) have already inherited the `enabled_shared_from_this`.
   
   If you inherited `enable_shared_from_this` for `ConsumerImplBase`, there would be impossible to do that for `HandlerBase`. I am going to fix this problem in my incoming PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r991929535


##########
lib/BatchReceivePolicyImpl.h:
##########
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+using namespace pulsar;

Review Comment:
   ```suggestion
   #pragma once
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992403519


##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    if (hasEnoughMessagesForBatchReceive()) {
+        Lock lock(batchPendingReceiveMutex_);
+        notifyBatchPendingReceivedCallback(callback);
+        lock.unlock();

Review Comment:
   Yes, you are right.  `batchPendingReceiveMutex_` is only used to make the accesses to `batchPendingReceives_` thread safe. I mixed it up.
   
   Here need a lock to make `hasEnoughMessagesForBatchReceive` and `notifyBatchPendingReceivedCallback` is atomic, then prevents users receive not compliant with the policy messages.
   
   I add a new `batchReceiveOptionMutex_` lock to sync them. 
   
   But, This is not entirely guaranteed, Because I didn't synchronize the scenario where a single receive message is `incomingQueue_` out of the queue.
   
   In Java client impl, These operations are all performed in `internalPinnedExecutor` threads. So no problem.
   
   I think it can be done this way first, and then I'll try to refactor it so that C++ also uses an internal thread to avoid adding various locks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992298046


##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeout) { testBatchReceiveTimeout(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { testBatchReceiveTimeout(true); }
+
+void testBatchReceiveClose(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive-close" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-close" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");

Review Comment:
   Sorry, it is useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r993123067


##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.

Review Comment:
   @shibd 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r997660266


##########
lib/ConsumerImplBase.h:
##########
@@ -20,23 +20,38 @@
 #define PULSAR_CONSUMER_IMPL_BASE_HEADER
 #include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
-
+#include "HandlerBase.h"
+#include <queue>
 #include <set>
 
 namespace pulsar {
 class ConsumerImplBase;
+class HandlerBase;
 
 typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
 
-class ConsumerImplBase {
+class OpBatchReceive {
    public:
-    virtual ~ConsumerImplBase() {}
+    OpBatchReceive();
+    explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback);
+    const BatchReceiveCallback batchReceiveCallback_;
+    const int64_t createAt_;
+};
+
+class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this<ConsumerImplBase> {

Review Comment:
   I'm going to push a new PR based on it. Merge this PR first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992318976


##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than maxReceiverQueueSize: {"
+                                                        << conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           "maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+            // cannot continue at this time, and the request needs to be ignored.

Review Comment:
   Why is it related to `runPartitionUpdateTask`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992300972


##########
lib/MessagesImpl.cc:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+    : maxNumberOfMessages_(maxNumberOfMessages),
+      maxSizeOfMessages_(maxSizeOfMessages),
+      currentNumberOfMessages_(0),
+      currentSizeOfMessages_(0) {
+    messageList_ = std::vector<Message>();
+}
+
+const std::vector<Message>& MessagesImpl::getMessageList() const { return messageList_; }
+
+bool MessagesImpl::canAdd(const Message& message) const {
+    if (currentNumberOfMessages_ == 0) {
+        return true;
+    }
+
+    if (maxNumberOfMessages_ > 0 && currentNumberOfMessages_ + 1 > maxNumberOfMessages_) {
+        return false;
+    }
+
+    if (maxSizeOfMessages_ > 0 && currentSizeOfMessages_ + message.getLength() > maxSizeOfMessages_) {
+        return false;
+    }
+
+    return true;
+}
+
+void MessagesImpl::add(const Message& message) {
+    if (!canAdd(message)) {
+        throw std::invalid_argument("No more space to add messages.");
+    }
+    currentNumberOfMessages_++;

Review Comment:
   Make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #21: [feat] Consumer support batch receive messages.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r990911396


##########
lib/ConsumerImpl.h:
##########
@@ -159,7 +160,8 @@ class ConsumerImpl : public ConsumerImplBase,
     ConsumerStatsBasePtr consumerStatsBasePtr_;
 
    private:
-    bool waitingForZeroQueueSizeMessage;
+    volatile bool waitingForZeroQueueSizeMessage;

Review Comment:
   Did you want to use `volatile` for thread-safety? The `volatile` keyword in C++ is much different from Java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org