You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/10/18 02:33:47 UTC
[pulsar-client-cpp] branch main updated: [feat] Consumer support batch receive messages. (#21)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 7f7653b [feat] Consumer support batch receive messages. (#21)
7f7653b is described below
commit 7f7653b694996f38cf422d8bef24aa1fdfb21c59
Author: Baodi Shi <ba...@icloud.com>
AuthorDate: Tue Oct 18 10:33:41 2022 +0800
[feat] Consumer support batch receive messages. (#21)
### Motivation
https://github.com/apache/pulsar/issues/17140
This PR has been reviewed in [pulsar repo](https://github.com/apache/pulsar/pull/17429).
### Modifications
- Consumer support batch receives messages.
- Abstract common implementation to `ConsumerImplBase`.
---
include/pulsar/BatchReceivePolicy.h | 90 ++++++++++++++++
include/pulsar/Consumer.h | 25 +++++
include/pulsar/ConsumerConfiguration.h | 18 ++++
lib/BatchReceivePolicy.cc | 57 +++++++++++
lib/BatchReceivePolicyImpl.h | 29 ++++++
lib/ClientImpl.h | 5 +-
lib/Consumer.cc | 18 ++++
lib/ConsumerConfiguration.cc | 9 ++
lib/ConsumerConfigurationImpl.h | 1 +
lib/ConsumerImpl.cc | 174 +++++++++++++++++--------------
lib/ConsumerImpl.h | 15 +--
lib/ConsumerImplBase.cc | 141 +++++++++++++++++++++++++
lib/ConsumerImplBase.h | 45 +++++++-
lib/HandlerBase.h | 1 +
lib/MessagesImpl.cc | 58 +++++++++++
lib/MessagesImpl.h | 46 +++++++++
lib/MultiTopicsConsumerImpl.cc | 142 ++++++++++++++++++--------
lib/MultiTopicsConsumerImpl.h | 22 ++--
tests/BasicEndToEndTest.cc | 181 +++++++++++++++++++++++++++++++++
tests/BatchReceivePolicyTest.cc | 40 ++++++++
tests/ConsumerConfigurationTest.cc | 8 ++
tests/MessagesImplTest.cc | 61 +++++++++++
22 files changed, 1037 insertions(+), 149 deletions(-)
diff --git a/include/pulsar/BatchReceivePolicy.h b/include/pulsar/BatchReceivePolicy.h
new file mode 100644
index 0000000..3c66da2
--- /dev/null
+++ b/include/pulsar/BatchReceivePolicy.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef BATCH_RECEIVE_POLICY_HPP_
+#define BATCH_RECEIVE_POLICY_HPP_
+
+#include <pulsar/defines.h>
+#include <memory>
+
+namespace pulsar {
+
+struct BatchReceivePolicyImpl;
+
+/**
+ * Configuration for message batch receive {@link Consumer#batchReceive()} {@link
+ * Consumer#batchReceiveAsync()}.
+ *
+ * <p>Batch receive policy can limit the number and bytes of messages in a single batch, and can specify a
+ * timeout for waiting for enough messages for this batch.
+ *
+ * <p>A batch receive action is completed as long as any one of the
+ * conditions (the batch has enough number or size of messages, or the waiting timeout is passed) are met.
+ *
+ * <p>Examples:
+ * 1.If set maxNumMessages = 10, maxSizeOfMessages = 1MB and without timeout, it
+ * means {@link Consumer#batchReceive()} will always wait until there is enough messages.
+ * 2.If set maxNumberOfMessages = 0, maxNumBytes = 0 and timeout = 100ms, it
+ * means {@link Consumer#batchReceive()} will wait for 100ms no matter whether there are enough messages.
+ *
+ * <p>Note:
+ * Must specify messages limitation(maxNumMessages, maxNumBytes) or wait timeout.
+ * Otherwise, {@link Messages} ingest {@link Message} will never end.
+ *
+ * @since 2.4.1
+ */
+class PULSAR_PUBLIC BatchReceivePolicy {
+ public:
+ /**
+ * Default value: {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
+ */
+ BatchReceivePolicy();
+
+ /**
+ *
+ * @param maxNumMessage Max num message, if less than 0, it means no limit.
+ * @param maxNumBytes Max num bytes, if less than 0, it means no limit.
+ * @param timeoutMs If less than 0, it means no limit.
+ */
+ BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs);
+
+ /**
+ * Get max time out ms.
+ *
+ * @return
+ */
+ long getTimeoutMs() const;
+
+ /**
+ * Get the maximum number of messages.
+ * @return
+ */
+ int getMaxNumMessages() const;
+
+ /**
+ * Get max num bytes.
+ * @return
+ */
+ long getMaxNumBytes() const;
+
+ private:
+ std::shared_ptr<BatchReceivePolicyImpl> impl_;
+};
+} // namespace pulsar
+
+#endif /* BATCH_RECEIVE_POLICY_HPP_ */
diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h
index 6c0ab27..c7911b9 100644
--- a/include/pulsar/Consumer.h
+++ b/include/pulsar/Consumer.h
@@ -113,6 +113,31 @@ class PULSAR_PUBLIC Consumer {
*/
void receiveAsync(ReceiveCallback callback);
+ /**
+ * Batch receiving messages.
+ *
+ * <p>This calls blocks until has enough messages or wait timeout, more details to see {@link
+ * BatchReceivePolicy}.
+ *
+ * @param msgs a non-const reference where the received messages will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+ */
+ Result batchReceive(Messages& msgs);
+
+ /**
+ * Async Batch receiving messages.
+ * <p>
+ * Retrieves a message when it will be available and completes callback with received message.
+ * </p>
+ * <p>
+ * batchReceiveAsync() should be called subsequently once callback gets completed with received message.
+ * Else it creates <i> backlog of receive requests </i> in the application.
+ * </p>
+ * @param BatchReceiveCallback will be completed when messages are available.
+ */
+ void batchReceiveAsync(BatchReceiveCallback callback);
+
/**
* Acknowledge the reception of a single message.
*
diff --git a/include/pulsar/ConsumerConfiguration.h b/include/pulsar/ConsumerConfiguration.h
index 4347c3b..13d5cc0 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -31,6 +31,7 @@
#include <pulsar/InitialPosition.h>
#include <pulsar/KeySharedPolicy.h>
#include <pulsar/ConsumerEventListener.h>
+#include "BatchReceivePolicy.h"
namespace pulsar {
@@ -38,8 +39,10 @@ class Consumer;
class PulsarWrapper;
/// Callback definition for non-data operation
+typedef std::vector<Message> Messages;
typedef std::function<void(Result result)> ResultCallback;
typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
+typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
/// Callback definition for MessageListener
@@ -378,6 +381,21 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
InitialPosition getSubscriptionInitialPosition() const;
+ /**
+ * Set batch receive policy.
+ *
+ * @param batchReceivePolicy the default is
+ * {maxNumMessage: -1, maxNumBytes: 10 * 1024 * 1024, timeoutMs: 100}
+ */
+ void setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy);
+
+ /**
+ * Get batch receive policy.
+ *
+ * @return batch receive policy
+ */
+ const BatchReceivePolicy& getBatchReceivePolicy() const;
+
/**
* Set whether the subscription status should be replicated.
* The default value is `false`.
diff --git a/lib/BatchReceivePolicy.cc b/lib/BatchReceivePolicy.cc
new file mode 100644
index 0000000..08aa368
--- /dev/null
+++ b/lib/BatchReceivePolicy.cc
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/BatchReceivePolicy.h>
+#include "BatchReceivePolicyImpl.h"
+#include "LogUtils.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+DECLARE_LOG_OBJECT()
+
+BatchReceivePolicy::BatchReceivePolicy() : BatchReceivePolicy(-1, 10 * 1024 * 1024, 100) {}
+
+BatchReceivePolicy::BatchReceivePolicy(int maxNumMessage, long maxNumBytes, long timeoutMs)
+ : impl_(std::make_shared<BatchReceivePolicyImpl>()) {
+ if (maxNumMessage <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) {
+ throw std::invalid_argument(
+ "At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.");
+ }
+ if (maxNumMessage <= 0 && maxNumBytes <= 0) {
+ impl_->maxNumMessage = -1;
+ impl_->maxNumBytes = 10 * 1024 * 1024;
+ LOG_WARN(
+ "BatchReceivePolicy maxNumMessages and maxNumBytes is less than 0. Reset to default: "
+ "maxNumMessage(-1), maxNumBytes(10 * 1024 * 10)");
+ } else {
+ impl_->maxNumMessage = maxNumMessage;
+ impl_->maxNumBytes = maxNumBytes;
+ }
+ impl_->timeoutMs = timeoutMs;
+}
+
+long BatchReceivePolicy::getTimeoutMs() const { return impl_->timeoutMs; }
+
+int BatchReceivePolicy::getMaxNumMessages() const { return impl_->maxNumMessage; }
+
+long BatchReceivePolicy::getMaxNumBytes() const { return impl_->maxNumBytes; }
+
+} // namespace pulsar
diff --git a/lib/BatchReceivePolicyImpl.h b/lib/BatchReceivePolicyImpl.h
new file mode 100644
index 0000000..e7ba431
--- /dev/null
+++ b/lib/BatchReceivePolicyImpl.h
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+namespace pulsar {
+
+struct BatchReceivePolicyImpl {
+ int maxNumMessage;
+ long maxNumBytes;
+ long timeoutMs;
+};
+
+} // namespace pulsar
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 466461a..e8e7708 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -28,14 +28,12 @@
#include <mutex>
#include <lib/TopicName.h>
#include "ProducerImplBase.h"
-#include "ConsumerImplBase.h"
#include <atomic>
#include <vector>
#include "ServiceNameResolver.h"
namespace pulsar {
-class ClientImpl;
class PulsarFriend;
typedef std::shared_ptr<ClientImpl> ClientImplPtr;
typedef std::weak_ptr<ClientImpl> ClientImplWeakPtr;
@@ -44,6 +42,9 @@ class ReaderImpl;
typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
+class ConsumerImplBase;
+typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
+
std::string generateRandomName();
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
diff --git a/lib/Consumer.cc b/lib/Consumer.cc
index 5d16362..13fb9f4 100644
--- a/lib/Consumer.cc
+++ b/lib/Consumer.cc
@@ -82,6 +82,24 @@ void Consumer::receiveAsync(ReceiveCallback callback) {
impl_->receiveAsync(callback);
}
+Result Consumer::batchReceive(Messages& msgs) {
+ if (!impl_) {
+ return ResultConsumerNotInitialized;
+ }
+ Promise<Result, Messages> promise;
+ impl_->batchReceiveAsync(WaitForCallbackValue<Messages>(promise));
+ return promise.getFuture().get(msgs);
+}
+
+void Consumer::batchReceiveAsync(BatchReceiveCallback callback) {
+ if (!impl_) {
+ Messages msgs;
+ callback(ResultConsumerNotInitialized, msgs);
+ return;
+ }
+ impl_->batchReceiveAsync(callback);
+}
+
Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); }
Result Consumer::acknowledge(const MessageId& messageId) {
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index f9fe499..0705cca 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -19,6 +19,7 @@
#include <lib/ConsumerConfigurationImpl.h>
#include <stdexcept>
+#include <pulsar/ConsumerConfiguration.h>
namespace pulsar {
@@ -267,4 +268,12 @@ ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool st
bool ConsumerConfiguration::isStartMessageIdInclusive() const { return impl_->startMessageIdInclusive; }
+void ConsumerConfiguration::setBatchReceivePolicy(const BatchReceivePolicy& batchReceivePolicy) {
+ impl_->batchReceivePolicy = batchReceivePolicy;
+}
+
+const BatchReceivePolicy& ConsumerConfiguration::getBatchReceivePolicy() const {
+ return impl_->batchReceivePolicy;
+}
+
} // namespace pulsar
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index cca83a3..444fedf 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -45,6 +45,7 @@ struct ConsumerConfigurationImpl {
ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
bool readCompacted{false};
InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
+ BatchReceivePolicy batchReceivePolicy{};
int patternAutoDiscoveryPeriod{60};
bool replicateSubscriptionStateEnabled{false};
std::map<std::string, std::string> properties;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 37fcd95..54e346f 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -18,6 +18,7 @@
*/
#include "ConsumerImpl.h"
#include "MessageImpl.h"
+#include "MessagesImpl.h"
#include "Commands.h"
#include "LogUtils.h"
#include "TimeUtils.h"
@@ -43,7 +44,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
- : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
+ : ConsumerImplBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
+ listenerExecutor ? listenerExecutor : client->getListenerExecutorProvider()->get()),
waitingForZeroQueueSizeMessage(false),
config_(conf),
subscription_(subscriptionName),
@@ -85,13 +87,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
- // Initialize listener executor.
- if (listenerExecutor) {
- listenerExecutor_ = listenerExecutor;
- } else {
- listenerExecutor_ = client->getListenerExecutorProvider()->get();
- }
-
// Setup stats reporter.
unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
if (statsIntervalInSeconds) {
@@ -145,12 +140,12 @@ const std::string& ConsumerImpl::getTopic() const { return topic_; }
void ConsumerImpl::start() {
HandlerBase::start();
- // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
+ // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
// constructor completed.
if (TopicName::get(topic_)->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
- client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
+ client_.lock(), get_shared_this_ptr(), consumerId_, config_.getAckGroupingTimeMs(),
config_.getAckGroupingMaxSize()));
} else {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
@@ -169,7 +164,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
// Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
// sending the subscribe request.
- cnx->registerConsumer(consumerId_, shared_from_this());
+ cnx->registerConsumer(consumerId_, get_shared_this_ptr());
if (duringSeek_) {
ackGroupingTrackerPtr_->flushAndClean();
@@ -195,13 +190,13 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());
cnx->sendRequestWithId(cmd, requestId)
- .addListener(
- std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
+ .addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx,
+ std::placeholders::_1));
}
void ConsumerImpl::connectionFailed(Result result) {
// Keep a reference to ensure object is kept alive
- ConsumerImplPtr ptr = shared_from_this();
+ auto ptr = get_shared_this_ptr();
if (consumerCreatedPromise_.setFailed(result)) {
state_ = Failed;
@@ -244,7 +239,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
sendFlowPermitsToBroker(cnx, 1);
}
}
- consumerCreatedPromise_.setValue(shared_from_this());
+ consumerCreatedPromise_.setValue(get_shared_this_ptr());
} else {
if (result == ResultTimeout) {
// Creating the consumer has timed out. We need to ensure the broker closes the consumer
@@ -257,12 +252,12 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
if (consumerCreatedPromise_.isComplete()) {
// Consumer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
- scheduleReconnection(shared_from_this());
+ scheduleReconnection(get_shared_this_ptr());
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
if (isRetriableError(result) && (creationTimestamp_ + operationTimeut_ < TimeUtils::now())) {
LOG_WARN(getName() << "Temporary error in creating consumer : " << strResult(result));
- scheduleReconnection(shared_from_this());
+ scheduleReconnection(get_shared_this_ptr());
} else {
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
consumerCreatedPromise_.setFailed(result);
@@ -292,7 +287,7 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
int requestId = client->newRequestId();
SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
cnx->sendRequestWithId(cmd, requestId)
- .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, shared_from_this(),
+ .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, get_shared_this_ptr(),
std::placeholders::_1, callback));
} else {
Result result = ResultNotConnected;
@@ -460,34 +455,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
<< startMessageId.value());
return;
}
-
- Lock lock(pendingReceiveMutex_);
- // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
- bool asyncReceivedWaiting = !pendingReceives_.empty();
- ReceiveCallback callback;
- if (asyncReceivedWaiting) {
- callback = pendingReceives_.front();
- pendingReceives_.pop();
- }
- lock.unlock();
-
- if (asyncReceivedWaiting) {
- listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
- shared_from_this(), ResultOk, m, callback));
- return;
- }
-
- // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
- if (config_.getReceiverQueueSize() != 0 ||
- (config_.getReceiverQueueSize() == 0 && messageListener_)) {
- incomingMessages_.push(m);
- } else {
- Lock lock(mutex_);
- if (waitingForZeroQueueSizeMessage) {
- lock.unlock();
- incomingMessages_.push(m);
- }
- }
+ executeNotifyCallback(m);
}
if (messageListener_) {
@@ -496,7 +464,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
}
// Trigger message listener callback in a separate thread
while (numOfMessageReceived--) {
- listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
+ listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
}
}
}
@@ -504,16 +472,16 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
void ConsumerImpl::activeConsumerChanged(bool isActive) {
if (eventListener_) {
listenerExecutor_->postWork(
- std::bind(&ConsumerImpl::internalConsumerChangeListener, shared_from_this(), isActive));
+ std::bind(&ConsumerImpl::internalConsumerChangeListener, get_shared_this_ptr(), isActive));
}
}
void ConsumerImpl::internalConsumerChangeListener(bool isActive) {
try {
if (isActive) {
- eventListener_->becameActive(Consumer(shared_from_this()), partitionIndex_);
+ eventListener_->becameActive(Consumer(get_shared_this_ptr()), partitionIndex_);
} else {
- eventListener_->becameInactive(Consumer(shared_from_this()), partitionIndex_);
+ eventListener_->becameInactive(Consumer(get_shared_this_ptr()), partitionIndex_);
}
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from event listener " << e.what());
@@ -527,11 +495,56 @@ void ConsumerImpl::failPendingReceiveCallback() {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
- shared_from_this(), ResultAlreadyClosed, msg, callback));
+ get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
}
lock.unlock();
}
+void ConsumerImpl::executeNotifyCallback(Message& msg) {
+ Lock lock(pendingReceiveMutex_);
+ // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+ bool asyncReceivedWaiting = !pendingReceives_.empty();
+ ReceiveCallback callback;
+ if (asyncReceivedWaiting) {
+ callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ }
+ lock.unlock();
+
+ // has pending receive, direct callback.
+ if (asyncReceivedWaiting) {
+ listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+ get_shared_this_ptr(), ResultOk, msg, callback));
+ return;
+ }
+
+ // try to add incoming messages.
+ // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
+ if (messageListener_ || config_.getReceiverQueueSize() != 0 || waitingForZeroQueueSizeMessage) {
+ incomingMessages_.push(msg);
+ incomingMessagesSize_.fetch_add(msg.getLength());
+ }
+
+ // try trigger pending batch messages
+ Lock batchOptionLock(batchReceiveOptionMutex_);
+ if (hasEnoughMessagesForBatchReceive()) {
+ ConsumerImplBase::notifyBatchPendingReceivedCallback();
+ }
+}
+
+void ConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+ auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+ batchReceivePolicy_.getMaxNumBytes());
+ Message peekMsg;
+ while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
+ messageProcessed(peekMsg);
+ messages->add(peekMsg);
+ }
+ auto self = get_shared_this_ptr();
+ listenerExecutor_->postWork(
+ [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+}
+
void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
const ReceiveCallback& callback) {
if (result == ResultOk && config_.getReceiverQueueSize() != 0) {
@@ -573,19 +586,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
}
}
- //
- Lock lock(pendingReceiveMutex_);
- if (!pendingReceives_.empty()) {
- ReceiveCallback callback = pendingReceives_.front();
- pendingReceives_.pop();
- lock.unlock();
- listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
- shared_from_this(), ResultOk, msg, callback));
- } else {
- // Regular path, append individual message to incoming messages queue
- incomingMessages_.push(msg);
- lock.unlock();
- }
+ executeNotifyCallback(msg);
}
if (skippedMessages > 0) {
@@ -698,7 +699,7 @@ void ConsumerImpl::internalListener() {
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessageId_ = msg.getMessageId();
- messageListener_(Consumer(shared_from_this()), msg);
+ messageListener_(Consumer(get_shared_this_ptr()), msg);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
}
@@ -721,9 +722,7 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
getName() << "The incoming message queue should never be greater than 0 when Queue size is 0");
incomingMessages_.clear();
}
- Lock localLock(mutex_);
waitingForZeroQueueSizeMessage = true;
- localLock.unlock();
sendFlowPermitsToBroker(currentCnx, 1);
@@ -745,7 +744,6 @@ Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
}
}
}
- return ResultOk;
}
Result ConsumerImpl::receive(Message& msg) {
@@ -837,6 +835,8 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
lastDequedMessageId_ = msg.getMessageId();
lock.unlock();
+ incomingMessagesSize_.fetch_sub(msg.getLength());
+
ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
LOG_DEBUG(getName() << "Not adding permit since connection is different.");
@@ -934,7 +934,7 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com
}
void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
- ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1,
+ ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
callback, proto::CommandAck_AckType_Individual);
if (msgId.batchIndex() != -1 &&
!batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) {
@@ -945,7 +945,7 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
}
void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
- ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1,
+ ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, get_shared_this_ptr(), std::placeholders::_1,
callback, proto::CommandAck_AckType_Cumulative);
if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
cb(ResultCumulativeAcknowledgementNotAllowedError);
@@ -993,12 +993,12 @@ void ConsumerImpl::disconnectConsumer() {
Lock lock(mutex_);
connection_.reset();
lock.unlock();
- scheduleReconnection(shared_from_this());
+ scheduleReconnection(get_shared_this_ptr());
}
void ConsumerImpl::closeAsync(ResultCallback callback) {
// Keep a reference to ensure object is kept alive
- ConsumerImplPtr ptr = shared_from_this();
+ ConsumerImplPtr ptr = get_shared_this_ptr();
if (state_ != Ready) {
if (callback) {
@@ -1041,12 +1041,16 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
if (callback) {
// Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
- future.addListener(
- std::bind(&ConsumerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr));
+ future.addListener(std::bind(&ConsumerImpl::handleClose, get_shared_this_ptr(), std::placeholders::_1,
+ callback, ptr));
}
// fail pendingReceive callback
failPendingReceiveCallback();
+ failPendingBatchReceiveCallback();
+
+ // cancel timer
+ batchReceiveTimer_->cancel();
}
void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
@@ -1102,7 +1106,7 @@ Result ConsumerImpl::resumeMessageListener() {
for (size_t i = 0; i < count; i++) {
// Trigger message listener callback in a separate thread
- listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, shared_from_this()));
+ listenerExecutor_->postWork(std::bind(&ConsumerImpl::internalListener, get_shared_this_ptr()));
}
// Check current permits and determine whether to send FLOW command
this->increaseAvailablePermits(getCnx().lock(), 0);
@@ -1167,7 +1171,7 @@ void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callb
<< ", requestId - " << requestId);
cnx->newConsumerStats(consumerId_, requestId)
- .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(),
+ .addListener(std::bind(&ConsumerImpl::brokerConsumerStatsListener, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, callback));
return;
} else {
@@ -1303,7 +1307,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
LOG_DEBUG(getName() << " Sending getLastMessageId Command for Consumer - " << getConsumerId()
<< ", requestId - " << requestId);
- auto self = shared_from_this();
+ auto self = get_shared_this_ptr();
cnx->newGetLastMessageId(consumerId_, requestId)
.addListener([this, self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result == ResultOk) {
@@ -1384,7 +1388,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
LOG_INFO(getName() << " Seeking subscription to " << seekId);
}
- std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this()};
+ std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
cnx->sendRequestWithId(seek, requestId)
.addListener([this, weakSelf, callback, originalSeekMessageId](Result result,
@@ -1419,4 +1423,18 @@ bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
: idx <= startMessageId_.get().value().entryId();
}
+bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const {
+ if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) {
+ return false;
+ }
+ return (batchReceivePolicy_.getMaxNumMessages() > 0 &&
+ incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) ||
+ (batchReceivePolicy_.getMaxNumBytes() > 0 &&
+ incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes());
+}
+
+std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
+ return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this());
+}
+
} /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 1ad3a4c..09d2c5c 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -65,9 +65,7 @@ enum ConsumerTopicType
Partitioned
};
-class ConsumerImpl : public ConsumerImplBase,
- public HandlerBase,
- public std::enable_shared_from_this<ConsumerImpl> {
+class ConsumerImpl : public ConsumerImplBase {
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration&, bool isPersistent,
@@ -147,7 +145,10 @@ class ConsumerImpl : public ConsumerImplBase,
// overrided methods from HandlerBase
void connectionOpened(const ClientConnectionPtr& cnx) override;
void connectionFailed(Result result) override;
- HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
+
+ // impl methods from ConsumerImpl base
+ bool hasEnoughMessagesForBatchReceive() const override;
+ void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
@@ -159,7 +160,8 @@ class ConsumerImpl : public ConsumerImplBase,
ConsumerStatsBasePtr consumerStatsBasePtr_;
private:
- bool waitingForZeroQueueSizeMessage;
+ std::atomic_bool waitingForZeroQueueSizeMessage;
+ std::shared_ptr<ConsumerImpl> get_shared_this_ptr();
bool uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageIdData,
const proto::MessageMetadata& metadata, SharedBuffer& payload,
bool checkMaxMessageSize);
@@ -180,6 +182,7 @@ class ConsumerImpl : public ConsumerImplBase,
Result receiveHelper(Message& msg);
Result receiveHelper(Message& msg, int timeout);
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
+ void executeNotifyCallback(Message& msg);
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
@@ -199,13 +202,13 @@ class ConsumerImpl : public ConsumerImplBase,
const bool isPersistent_;
MessageListener messageListener_;
ConsumerEventListenerPtr eventListener_;
- ExecutorServicePtr listenerExecutor_;
bool hasParent_;
ConsumerTopicType consumerTopicType_;
const Commands::SubscriptionMode subscriptionMode_;
UnboundedBlockingQueue<Message> incomingMessages_;
+ std::atomic_int incomingMessagesSize_ = {0};
std::queue<ReceiveCallback> pendingReceives_;
std::atomic_int availablePermits_;
const int receiverQueueRefillThreshold_;
diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc
new file mode 100644
index 0000000..4a8c027
--- /dev/null
+++ b/lib/ConsumerImplBase.cc
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+ const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor)
+ : HandlerBase(client, topic, backoff),
+ listenerExecutor_(listenerExecutor),
+ batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+ auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+ if (userBatchReceivePolicy.getMaxNumMessages() > conf.getReceiverQueueSize()) {
+ batchReceivePolicy_ =
+ BatchReceivePolicy(conf.getReceiverQueueSize(), userBatchReceivePolicy.getMaxNumBytes(),
+ userBatchReceivePolicy.getTimeoutMs());
+ LOG_WARN("BatchReceivePolicy maxNumMessages: {" << userBatchReceivePolicy.getMaxNumMessages()
+ << "} is greater than maxReceiverQueueSize: {"
+ << conf.getReceiverQueueSize()
+ << "}, reset to "
+ "maxReceiverQueueSize. ");
+ }
+ batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+ if (timeoutMs > 0) {
+ batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+ std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+ batchReceiveTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+ auto self = weakSelf.lock();
+ if (self && !ec) {
+ self->doBatchReceiveTimeTask();
+ }
+ });
+ }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+ if (state_ != Ready) {
+ return;
+ }
+
+ bool hasPendingReceives = false;
+ long timeToWaitMs;
+
+ Lock lock(batchPendingReceiveMutex_);
+ while (!batchPendingReceives_.empty()) {
+ OpBatchReceive& batchReceive = batchPendingReceives_.front();
+ long diff =
+ batchReceivePolicy_.getTimeoutMs() - (TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+ if (diff <= 0) {
+ Lock batchOptionLock(batchReceiveOptionMutex_);
+ notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+ batchOptionLock.unlock();
+ batchPendingReceives_.pop();
+ } else {
+ hasPendingReceives = true;
+ timeToWaitMs = diff;
+ break;
+ }
+ }
+ lock.unlock();
+
+ if (hasPendingReceives) {
+ triggerBatchReceiveTimerTask(timeToWaitMs);
+ }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+ Lock lock(batchPendingReceiveMutex_);
+ while (!batchPendingReceives_.empty()) {
+ OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+ batchPendingReceives_.pop();
+ listenerExecutor_->postWork(
+ [opBatchReceive]() { opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {}); });
+ }
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+ Lock lock(batchPendingReceiveMutex_);
+ if (!batchPendingReceives_.empty()) {
+ OpBatchReceive& batchReceive = batchPendingReceives_.front();
+ batchPendingReceives_.pop();
+ lock.unlock();
+ notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+ }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+ // fail the callback if consumer is closing or closed
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed, Messages());
+ return;
+ }
+
+ Lock batchOptionLock(batchReceiveOptionMutex_);
+ if (hasEnoughMessagesForBatchReceive()) {
+ notifyBatchPendingReceivedCallback(callback);
+ batchOptionLock.unlock();
+ } else {
+ OpBatchReceive opBatchReceive(callback);
+ Lock lock(batchPendingReceiveMutex_);
+ batchPendingReceives_.emplace(opBatchReceive);
+ lock.unlock();
+ triggerBatchReceiveTimerTask(batchReceivePolicy_.getTimeoutMs());
+ }
+}
+
+OpBatchReceive::OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback)
+ : batchReceiveCallback_(batchReceiveCallback), createAt_(TimeUtils::currentTimeMillis()) {}
+
+} /* namespace pulsar */
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 693d4da..18b8bc1 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -20,23 +20,38 @@
#define PULSAR_CONSUMER_IMPL_BASE_HEADER
#include <pulsar/Message.h>
#include <pulsar/Consumer.h>
-
+#include "HandlerBase.h"
+#include <queue>
#include <set>
namespace pulsar {
class ConsumerImplBase;
+class HandlerBase;
typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
-class ConsumerImplBase {
+class OpBatchReceive {
public:
- virtual ~ConsumerImplBase() {}
+ OpBatchReceive();
+ explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback);
+ const BatchReceiveCallback batchReceiveCallback_;
+ const int64_t createAt_;
+};
+
+class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this<ConsumerImplBase> {
+ public:
+ virtual ~ConsumerImplBase(){};
+ ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
+ const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor);
+
+ // interface by consumer
virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() = 0;
- virtual const std::string& getSubscriptionName() const = 0;
virtual const std::string& getTopic() const = 0;
+ virtual const std::string& getSubscriptionName() const = 0;
virtual Result receive(Message& msg) = 0;
virtual Result receive(Message& msg, int timeout) = 0;
virtual void receiveAsync(ReceiveCallback& callback) = 0;
+ void batchReceiveAsync(BatchReceiveCallback callback);
virtual void unsubscribeAsync(ResultCallback callback) = 0;
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
@@ -49,7 +64,6 @@ class ConsumerImplBase {
virtual Result resumeMessageListener() = 0;
virtual void redeliverUnacknowledgedMessages() = 0;
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
- virtual const std::string& getName() const = 0;
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
@@ -57,6 +71,27 @@ class ConsumerImplBase {
virtual void negativeAcknowledge(const MessageId& msgId) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedConsumer() = 0;
+ // overrided methods from HandlerBase
+ virtual const std::string& getName() const override = 0;
+
+ protected:
+ // overrided methods from HandlerBase
+ void connectionOpened(const ClientConnectionPtr& cnx) override {}
+ void connectionFailed(Result result) override {}
+ HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
+
+ // consumer impl generic method.
+ ExecutorServicePtr listenerExecutor_;
+ std::queue<OpBatchReceive> batchPendingReceives_;
+ BatchReceivePolicy batchReceivePolicy_;
+ DeadlineTimerPtr batchReceiveTimer_;
+ std::mutex batchReceiveOptionMutex_;
+ void triggerBatchReceiveTimerTask(long timeoutMs);
+ void doBatchReceiveTimeTask();
+ void failPendingBatchReceiveCallback();
+ void notifyBatchPendingReceivedCallback();
+ virtual void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) = 0;
+ virtual bool hasEnoughMessagesForBatchReceive() const = 0;
private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled) = 0;
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 1184746..6fc3603 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -90,6 +90,7 @@ class HandlerBase {
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
+ std::mutex batchPendingReceiveMutex_;
ptime creationTimestamp_;
const TimeDuration operationTimeut_;
diff --git a/lib/MessagesImpl.cc b/lib/MessagesImpl.cc
new file mode 100644
index 0000000..7d45cdd
--- /dev/null
+++ b/lib/MessagesImpl.cc
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+ : maxNumberOfMessages_(maxNumberOfMessages),
+ maxSizeOfMessages_(maxSizeOfMessages),
+ currentSizeOfMessages_(0) {}
+
+const std::vector<Message>& MessagesImpl::getMessageList() const { return messageList_; }
+
+bool MessagesImpl::canAdd(const Message& message) const {
+ if (messageList_.size() == 0) {
+ return true;
+ }
+
+ if (maxNumberOfMessages_ > 0 && messageList_.size() + 1 > maxNumberOfMessages_) {
+ return false;
+ }
+
+ if (maxSizeOfMessages_ > 0 && currentSizeOfMessages_ + message.getLength() > maxSizeOfMessages_) {
+ return false;
+ }
+
+ return true;
+}
+
+void MessagesImpl::add(const Message& message) {
+ if (!canAdd(message)) {
+ throw std::invalid_argument("No more space to add messages.");
+ }
+ currentSizeOfMessages_ += message.getLength();
+ messageList_.emplace_back(message);
+}
+
+int MessagesImpl::size() const { return messageList_.size(); }
+
+void MessagesImpl::clear() {
+ currentSizeOfMessages_ = 0;
+ messageList_.clear();
+}
diff --git a/lib/MessagesImpl.h b/lib/MessagesImpl.h
new file mode 100644
index 0000000..0c12768
--- /dev/null
+++ b/lib/MessagesImpl.h
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_CPP_MESSAGESIMPL_H
+#define PULSAR_CPP_MESSAGESIMPL_H
+
+#include <vector>
+#include <pulsar/Message.h>
+
+using namespace pulsar;
+
+namespace pulsar {
+
+class MessagesImpl {
+ public:
+ MessagesImpl(const int maxNumberOfMessages, const long maxSizeOfMessages);
+ const std::vector<Message>& getMessageList() const;
+ bool canAdd(const Message& message) const;
+ void add(const Message& message);
+ int size() const;
+ void clear();
+
+ private:
+ std::vector<Message> messageList_;
+ const int maxNumberOfMessages_;
+ const long maxSizeOfMessages_;
+ long currentSizeOfMessages_;
+};
+
+} // namespace pulsar
+#endif // PULSAR_CPP_MESSAGESIMPL_H
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 0d730e1..573c33d 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -18,6 +18,7 @@
*/
#include "MultiTopicsConsumerImpl.h"
#include "MultiResultCallback.h"
+#include "MessagesImpl.h"
DECLARE_LOG_OBJECT()
@@ -27,12 +28,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr)
- : client_(client),
+ : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
+ Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
+ client->getListenerExecutorProvider()->get()),
+ client_(client),
subscriptionName_(subscriptionName),
- topic_(topicName ? topicName->toString() : "EmptyTopics"),
conf_(conf),
- messages_(conf.getReceiverQueueSize()),
- listenerExecutor_(client->getListenerExecutorProvider()->get()),
+ incomingMessages_(conf.getReceiverQueueSize()),
messageListener_(conf.getMessageListener()),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
@@ -59,14 +61,16 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
lookupServicePtr_ = client_->getLookup();
}
+
+ state_ = Pending;
}
void MultiTopicsConsumerImpl::start() {
if (topics_.empty()) {
- MultiTopicsConsumerState state = Pending;
+ State state = Pending;
if (state_.compare_exchange_strong(state, Ready)) {
LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
- multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+ multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
return;
} else {
LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << state_);
@@ -81,7 +85,7 @@ void MultiTopicsConsumerImpl::start() {
// subscribe for each passed in topic
for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed,
- shared_from_this(), std::placeholders::_1,
+ get_shared_this_ptr(), std::placeholders::_1,
std::placeholders::_2, *itr, topicsNeedCreate));
}
}
@@ -100,10 +104,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
}
if (--(*topicsNeedCreate) == 0) {
- MultiTopicsConsumerState state = Pending;
+ State state = Pending;
if (state_.compare_exchange_strong(state, Ready)) {
LOG_INFO("Successfully Subscribed to Topics");
- multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
+ multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
} else {
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
// unsubscribed all of the successfully subscribed partitioned consumers
@@ -162,7 +166,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
ConsumerConfiguration config = conf_.clone();
ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
- config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
+ config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2));
int partitions = numPartitions == 0 ? 1 : numPartitions;
@@ -186,8 +190,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
topicName->isPersistent(), internalListenerExecutor, true,
NonPartitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
- &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1,
- std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
+ &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
+ std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumers_.emplace(topicName->toString(), consumer);
LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
consumer->start();
@@ -199,7 +203,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
topicName->isPersistent(), internalListenerExecutor,
true, Partitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
- &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
+ &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumer->setPartitionIndex(i);
consumers_.emplace(topicPartitionName, consumer);
@@ -236,7 +240,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
- topicSubResultPromise->setValue(Consumer(shared_from_this()));
+ topicSubResultPromise->setValue(Consumer(get_shared_this_ptr()));
}
}
@@ -252,7 +256,7 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
state_ = Closing;
std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
- auto self = shared_from_this();
+ auto self = get_shared_this_ptr();
int numConsumers = 0;
consumers_.forEachValue(
[&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
@@ -329,7 +333,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
}
optConsumer.value()->unsubscribeAsync(
- std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, shared_from_this(),
+ std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, get_shared_this_ptr(),
std::placeholders::_1, consumerUnsubed, numberPartitions, topicName, topicPartitionName,
callback));
}
@@ -385,7 +389,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
state_ = Closing;
- std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
+ std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
int numConsumers = 0;
consumers_.clear(
[this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
@@ -414,7 +418,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
}
// closed all consumers
if (numConsumersLeft == 0) {
- messages_.clear();
+ incomingMessages_.clear();
topicsPartitions_.clear();
unAckedMessageTrackerPtr_->clear();
@@ -440,6 +444,10 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
// fail pending receive
failPendingReceiveCallback();
+ failPendingBatchReceiveCallback();
+
+ // cancel timer
+ batchReceiveTimer_->cancel();
}
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
@@ -454,25 +462,39 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
pendingReceives_.pop();
lock.unlock();
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
- shared_from_this(), ResultOk, msg, callback));
- } else {
- if (messages_.full()) {
- lock.unlock();
- }
+ get_shared_this_ptr(), ResultOk, msg, callback));
+ return;
+ }
- if (messages_.push(msg) && messageListener_) {
- listenerExecutor_->postWork(
- std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
- }
+ if (incomingMessages_.full()) {
+ lock.unlock();
+ }
+
+ // add message to block queue.
+ // when messages queue is full, will block listener thread on ConsumerImpl,
+ // then will not send permits to broker, will broker stop push message.
+ incomingMessages_.push(msg);
+ incomingMessagesSize_.fetch_add(msg.getLength());
+
+ // try trigger pending batch messages
+ Lock batchOptionLock(batchReceiveOptionMutex_);
+ if (hasEnoughMessagesForBatchReceive()) {
+ ConsumerImplBase::notifyBatchPendingReceivedCallback();
+ }
+ batchOptionLock.unlock();
+
+ if (messageListener_) {
+ listenerExecutor_->postWork(
+ std::bind(&MultiTopicsConsumerImpl::internalListener, get_shared_this_ptr(), consumer));
}
}
void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
Message m;
- messages_.pop(m);
- unAckedMessageTrackerPtr_->add(m.getMessageId());
+ incomingMessages_.pop(m);
try {
- messageListener_(Consumer(shared_from_this()), m);
+ messageListener_(Consumer(get_shared_this_ptr()), m);
+ messageProcessed(m);
} catch (const std::exception& e) {
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
}
@@ -487,9 +509,9 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
LOG_ERROR("Can not receive when a listener has been set");
return ResultInvalidConfiguration;
}
- messages_.pop(msg);
+ incomingMessages_.pop(msg);
+ messageProcessed(msg);
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
}
@@ -503,8 +525,8 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
return ResultInvalidConfiguration;
}
- if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
+ messageProcessed(msg);
return ResultOk;
} else {
if (state_ != Ready) {
@@ -524,9 +546,9 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
}
Lock lock(pendingReceiveMutex_);
- if (messages_.pop(msg, std::chrono::milliseconds(0))) {
+ if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ messageProcessed(msg);
callback(ResultOk, msg);
} else {
pendingReceives_.push(callback);
@@ -536,14 +558,14 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
Message msg;
- messages_.close();
+ incomingMessages_.close();
Lock lock(pendingReceiveMutex_);
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
- shared_from_this(), ResultAlreadyClosed, msg, callback));
+ get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
}
lock.unlock();
}
@@ -649,7 +671,7 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
});
}
-int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
+int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
if (state_ != Ready) {
@@ -662,7 +684,7 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
lock.unlock();
- auto self = shared_from_this();
+ auto self = get_shared_this_ptr();
size_t i = 0;
consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
size_t index = i++;
@@ -750,7 +772,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
}
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
- std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
+ std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
// If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
// cannot continue at this time, and the request needs to be ignored.
@@ -769,7 +791,7 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
auto topicName = TopicName::get(item.first);
auto currentNumPartitions = item.second;
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
- std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, shared_from_this(), topicName,
+ std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, get_shared_this_ptr(), topicName,
std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
}
}
@@ -810,7 +832,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
ConsumerConfiguration config = conf_.clone();
ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
- config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
+ config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2));
// Apply total limit of receiver queue size across partitions
@@ -824,7 +846,7 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
topicName->isPersistent(), internalListenerExecutor, true,
Partitioned);
consumer->getConsumerCreatedFuture().addListener(
- std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
+ std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumer->setPartitionIndex(partitionIndex);
consumer->start();
@@ -832,3 +854,35 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
<< " consumerSize: " << consumers_.size());
}
+
+bool MultiTopicsConsumerImpl::hasEnoughMessagesForBatchReceive() const {
+ if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) {
+ return false;
+ }
+ return (batchReceivePolicy_.getMaxNumMessages() > 0 &&
+ incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) ||
+ (batchReceivePolicy_.getMaxNumBytes() > 0 &&
+ incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes());
+}
+
+void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
+ auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+ batchReceivePolicy_.getMaxNumBytes());
+ Message peekMsg;
+ while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && messages->canAdd(peekMsg)) {
+ messageProcessed(peekMsg);
+ messages->add(peekMsg);
+ }
+ auto self = get_shared_this_ptr();
+ listenerExecutor_->postWork(
+ [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+}
+
+void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
+ incomingMessagesSize_.fetch_sub(msg.getLength());
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+}
+
+std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
+ return std::dynamic_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 8769d59..044f417 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -38,17 +38,8 @@ namespace pulsar {
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
class MultiTopicsConsumerImpl;
-class MultiTopicsConsumerImpl : public ConsumerImplBase,
- public std::enable_shared_from_this<MultiTopicsConsumerImpl> {
+class MultiTopicsConsumerImpl : public ConsumerImplBase {
public:
- enum MultiTopicsConsumerState
- {
- Pending,
- Ready,
- Closing,
- Closed,
- Failed
- };
MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
@@ -99,16 +90,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
const ClientImplPtr client_;
const std::string subscriptionName_;
std::string consumerStr_;
- std::string topic_;
const ConsumerConfiguration conf_;
typedef SynchronizedHashMap<std::string, ConsumerImplPtr> ConsumerMap;
ConsumerMap consumers_;
std::map<std::string, int> topicsPartitions_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
- std::atomic<MultiTopicsConsumerState> state_{Pending};
- BlockingQueue<Message> messages_;
- const ExecutorServicePtr listenerExecutor_;
+ BlockingQueue<Message> incomingMessages_;
+ std::atomic_int incomingMessagesSize_ = {0};
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
boost::posix_time::time_duration partitionsUpdateInterval_;
@@ -125,6 +114,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
unsigned int partitionIndex);
void notifyResult(CloseCallback closeCallback);
void messageReceived(Consumer consumer, const Message& msg);
+ void messageProcessed(Message& msg);
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
@@ -149,8 +139,12 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex,
ConsumerSubResultPromisePtr topicSubResultPromise,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate);
+ // impl consumer base virtual method
+ bool hasEnoughMessagesForBatchReceive() const override;
+ void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
private:
+ std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 54a07bb..d3e424e 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -4098,3 +4098,184 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
consumer.close();
client.close();
}
+
+void testBatchReceive(bool multiConsumer) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+
+ std::string uniqueChunk = unique_str();
+ std::string topicName = "persistent://public/default/test-batch-receive" + uniqueChunk;
+
+ if (multiConsumer) {
+ // call admin api to make it partitioned
+ std::string url =
+ adminUrl + "admin/v2/persistent/public/default/test-batch-receive" + uniqueChunk + "/partitions";
+ int res = makePutRequest(url, "5");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+ }
+
+ std::string subName = "subscription-name";
+ Producer producer;
+
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ // when receiver queue size > maxNumMessages, use receiver queue size.
+ consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+ consumerConfig.setReceiverQueueSize(10);
+ consumerConfig.setProperty("consumer-name", "test-consumer-name");
+ consumerConfig.setProperty("consumer-id", "test-consumer-id");
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, consumerConfig,
+ WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ // sync batch receive test
+ std::string prefix = "batch-receive-msg";
+ int numOfMessages = 10;
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ producer.send(msg);
+ }
+
+ Messages messages;
+ Result receive = consumer.batchReceive(messages);
+ ASSERT_EQ(receive, ResultOk);
+ ASSERT_EQ(messages.size(), numOfMessages);
+
+ // async batch receive test
+ Latch latch(1);
+ BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+ ASSERT_EQ(result, ResultOk);
+ ASSERT_EQ(messages.size(), numOfMessages);
+ latch.countdown();
+ };
+ consumer.batchReceiveAsync(batchReceiveCallback);
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ producer.send(msg);
+ }
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+ producer.close();
+ consumer.close();
+ client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string uniqueChunk = unique_str();
+ std::string topicName = "persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+ if (multiConsumer) {
+ // call admin api to make it partitioned
+ std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-timeout" +
+ uniqueChunk + "/partitions";
+ int res = makePutRequest(url, "5");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+ }
+
+ std::string subName = "subscription-name";
+ Producer producer;
+
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, consumerConfig,
+ WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ std::string prefix = "batch-receive-msg";
+ int numOfMessages = 10;
+
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ producer.send(msg);
+ }
+
+ Latch latch(1);
+ BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result result, Messages messages) {
+ ASSERT_EQ(result, ResultOk);
+ ASSERT_EQ(messages.size(), numOfMessages);
+ latch.countdown();
+ };
+ consumer.batchReceiveAsync(batchReceiveCallback);
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+ producer.close();
+ consumer.close();
+ client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeout) { testBatchReceiveTimeout(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { testBatchReceiveTimeout(true); }
+
+void testBatchReceiveClose(bool multiConsumer) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+
+ std::string uniqueChunk = unique_str();
+ std::string topicName = "persistent://public/default/test-batch-receive-close" + uniqueChunk;
+
+ if (multiConsumer) {
+ // call admin api to make it partitioned
+ std::string url = adminUrl + "admin/v2/persistent/public/default/test-batch-receive-close" +
+ uniqueChunk + "/partitions";
+ int res = makePutRequest(url, "5");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+ }
+
+ std::string subName = "subscription-name";
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, consumerConfig,
+ WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ Result result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ Latch latch(1);
+ BatchReceiveCallback batchReceiveCallback = [&latch](Result result, Messages messages) {
+ ASSERT_EQ(result, ResultAlreadyClosed);
+ latch.countdown();
+ };
+ consumer.batchReceiveAsync(batchReceiveCallback);
+ consumer.close();
+ client.close();
+
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }
diff --git a/tests/BatchReceivePolicyTest.cc b/tests/BatchReceivePolicyTest.cc
new file mode 100644
index 0000000..ab9ffcc
--- /dev/null
+++ b/tests/BatchReceivePolicyTest.cc
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/BatchReceivePolicy.h>
+
+using namespace pulsar;
+
+TEST(BatchReceivePolicyTest, testBatchReceivePolicy) {
+ ASSERT_THROW(BatchReceivePolicy(-1, -1, -1), std::invalid_argument);
+
+ {
+ BatchReceivePolicy batchReceivePolicy;
+ ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1);
+ ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024);
+ ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 100);
+ }
+
+ {
+ BatchReceivePolicy batchReceivePolicy(-1, -1, 123);
+ ASSERT_EQ(batchReceivePolicy.getMaxNumMessages(), -1);
+ ASSERT_EQ(batchReceivePolicy.getMaxNumBytes(), 10 * 1024 * 1024);
+ ASSERT_EQ(batchReceivePolicy.getTimeoutMs(), 123);
+ }
+}
diff --git a/tests/ConsumerConfigurationTest.cc b/tests/ConsumerConfigurationTest.cc
index 24f541b..20cd8f4 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -61,6 +61,9 @@ TEST(ConsumerConfigurationTest, testDefaultConfig) {
ASSERT_EQ(conf.getPriorityLevel(), 0);
ASSERT_EQ(conf.getMaxPendingChunkedMessage(), 10);
ASSERT_EQ(conf.isAutoAckOldestChunkedMessageOnQueueFull(), false);
+ ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), -1);
+ ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10 * 1024 * 1024);
+ ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
}
TEST(ConsumerConfigurationTest, testCustomConfig) {
@@ -151,6 +154,11 @@ TEST(ConsumerConfigurationTest, testCustomConfig) {
conf.setAutoAckOldestChunkedMessageOnQueueFull(true);
ASSERT_TRUE(conf.isAutoAckOldestChunkedMessageOnQueueFull());
+
+ conf.setBatchReceivePolicy(BatchReceivePolicy(10, 10, 100));
+ ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumMessages(), 10);
+ ASSERT_EQ(conf.getBatchReceivePolicy().getMaxNumBytes(), 10);
+ ASSERT_EQ(conf.getBatchReceivePolicy().getTimeoutMs(), 100);
}
TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
diff --git a/tests/MessagesImplTest.cc b/tests/MessagesImplTest.cc
new file mode 100644
index 0000000..e963501
--- /dev/null
+++ b/tests/MessagesImplTest.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <MessagesImpl.h>
+#include "pulsar/MessageBuilder.h"
+
+using namespace pulsar;
+
+TEST(MessagesImplTest, testMessage) {
+ // 0. test not limits
+ {
+ MessagesImpl messages(-1, -1);
+ ASSERT_TRUE(messages.canAdd(Message()));
+ }
+
+ // 1. test max number of messages.
+ {
+ Message msg = MessageBuilder().setContent("c").build();
+ MessagesImpl messages(10, -1);
+ for (int i = 0; i < 10; i++) {
+ messages.add(msg);
+ }
+ ASSERT_FALSE(messages.canAdd(msg));
+ ASSERT_EQ(messages.size(), 10);
+ ASSERT_THROW(messages.add(msg), std::invalid_argument);
+ messages.clear();
+ ASSERT_TRUE(messages.canAdd(msg));
+ ASSERT_EQ(messages.size(), 0);
+ }
+
+ // 2. test max size of messages.
+ {
+ Message msg = MessageBuilder().setContent("c").build();
+ MessagesImpl messages(-1, 10);
+ for (int i = 0; i < 10; i++) {
+ messages.add(msg);
+ }
+ ASSERT_FALSE(messages.canAdd(msg));
+ ASSERT_EQ(messages.size(), 10);
+ ASSERT_THROW(messages.add(msg), std::invalid_argument);
+ messages.clear();
+ ASSERT_TRUE(messages.canAdd(msg));
+ ASSERT_EQ(messages.size(), 0);
+ }
+}