You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/21 22:51:22 UTC
[pulsar] branch master updated: [cpp-client] add support of
receiveAsync API (#3389)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 02cddfd [cpp-client] add support of receiveAsync API (#3389)
02cddfd is described below
commit 02cddfd8386860045378a16afa33820622f0759f
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jan 21 14:51:17 2019 -0800
[cpp-client] add support of receiveAsync API (#3389)
### Motivation
In many cases, client requires receiveAsync() api in Consumer. This api is already available into java-client but doesn't exist into CPP-client.
### Modification
Add support for receiveAsync() api in cpp-client consumer.
This PR is rebased and reopened from #577
---
pulsar-client-cpp/include/pulsar/Consumer.h | 13 +
.../include/pulsar/ConsumerConfiguration.h | 1 +
pulsar-client-cpp/lib/Consumer.cc | 9 +
pulsar-client-cpp/lib/ConsumerImpl.cc | 87 +++++-
pulsar-client-cpp/lib/ConsumerImpl.h | 5 +
pulsar-client-cpp/lib/ConsumerImplBase.h | 1 +
pulsar-client-cpp/lib/HandlerBase.h | 1 +
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 56 +++-
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 5 +
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 58 +++-
pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 5 +
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 316 ++++++++++++++++++++-
12 files changed, 532 insertions(+), 25 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index 4486515..4f1d6d4 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -100,6 +100,19 @@ class Consumer {
Result receive(Message& msg, int timeoutMs);
/**
+ * Receive a single message
+ * <p>
+ * Retrieves a message when it will be available and completes callback with received message.
+ * </p>
+ * <p>
+ * receiveAsync() should be called subsequently once callback gets completed with received message.
+ * Else it creates <i> backlog of receive requests </i> in the application.
+ * </p>
+ * @param ReceiveCallback will be completed when message is available
+ */
+ void receiveAsync(ReceiveCallback callback);
+
+ /**
* Acknowledge the reception of a single message.
*
* This method will block until an acknowledgement is sent to the broker. After
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 60ffef1..69c17e4 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -36,6 +36,7 @@ class PulsarWrapper;
/// Callback definition for non-data operation
typedef boost::function<void(Result result)> ResultCallback;
+typedef boost::function<void(Result, const Message& msg)> ReceiveCallback;
/// Callback definition for MessageListener
typedef boost::function<void(Consumer consumer, const Message& msg)> MessageListener;
diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc
index d89dd73..cbe44fe 100644
--- a/pulsar-client-cpp/lib/Consumer.cc
+++ b/pulsar-client-cpp/lib/Consumer.cc
@@ -73,6 +73,15 @@ Result Consumer::receive(Message& msg, int timeoutMs) {
return impl_->receive(msg, timeoutMs);
}
+void Consumer::receiveAsync(ReceiveCallback callback) {
+ if (!impl_) {
+ Message msg;
+ callback(ResultConsumerNotInitialized, msg);
+ return;
+ }
+ impl_->receiveAsync(callback);
+}
+
Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); }
Result Consumer::acknowledge(const MessageId& messageId) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 35a18db..4a24dfd 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -48,6 +48,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
startMessageId_(startMessageId),
// This is the initial capacity of the queue
incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
+ pendingReceives_(),
availablePermits_(conf.getReceiverQueueSize()),
consumerId_(client->newConsumerId()),
consumerName_(config_.getConsumerName()),
@@ -290,6 +291,22 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
Lock lock(mutex_);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m);
} else {
+ Lock lock(pendingReceiveMutex_);
+ // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+ bool asyncReceivedWaiting = !pendingReceives_.empty();
+ ReceiveCallback callback;
+ if (asyncReceivedWaiting) {
+ callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ }
+ lock.unlock();
+
+ if (asyncReceivedWaiting) {
+ listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(), ResultOk, m, callback));
+ return;
+ }
+
// config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
if (config_.getReceiverQueueSize() != 0 ||
(config_.getReceiverQueueSize() == 0 && messageListener_)) {
@@ -316,6 +333,27 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
}
}
+void ConsumerImpl::failPendingReceiveCallback() {
+ Message msg;
+ Lock lock(pendingReceiveMutex_);
+ while (!pendingReceives_.empty()) {
+ ReceiveCallback callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(), ResultAlreadyClosed, msg, callback));
+ }
+ lock.unlock();
+}
+
+void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
+ const ReceiveCallback& callback) {
+ if (result == ResultOk && config_.getReceiverQueueSize() != 0) {
+ messageProcessed(msg);
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ }
+ callback(result, msg);
+}
+
// Zero Queue size is not supported with Batch Messages
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
Message& batchedMessage) {
@@ -345,8 +383,19 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
}
}
- // Regular path, append individual message to incoming messages queue
- incomingMessages_.push(msg);
+ //
+ Lock lock(pendingReceiveMutex_);
+ if (!pendingReceives_.empty()) {
+ ReceiveCallback callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ lock.unlock();
+ listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(), ResultOk, msg, callback));
+ } else {
+ // Regular path, append individual message to incoming messages queue
+ incomingMessages_.push(msg);
+ lock.unlock();
+ }
}
if (skippedMessages > 0) {
@@ -509,6 +558,37 @@ Result ConsumerImpl::receive(Message& msg) {
return res;
}
+void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+ Message msg;
+
+ // fail the callback if consumer is closing or closed
+ Lock stateLock(mutex_);
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed, msg);
+ return;
+ }
+ stateLock.unlock();
+
+ Lock lock(pendingReceiveMutex_);
+ if (incomingMessages_.pop(msg, milliseconds(0))) {
+ lock.unlock();
+ messageProcessed(msg);
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ callback(ResultOk, msg);
+ } else {
+ pendingReceives_.push(callback);
+ lock.unlock();
+
+ if (config_.getReceiverQueueSize() == 0) {
+ ClientConnectionPtr currentCnx = getCnx().lock();
+ if (currentCnx) {
+ LOG_DEBUG(getName() << "Send more permits: " << 1);
+ receiveMessages(currentCnx, 1);
+ }
+ }
+ }
+}
+
Result ConsumerImpl::receiveHelper(Message& msg) {
{
Lock lock(mutex_);
@@ -747,6 +827,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
if (!callback.empty()) {
future.addListener(boost::bind(&ConsumerImpl::handleClose, shared_from_this(), _1, callback));
}
+
+ // fail pendingReceive callback
+ failPendingReceiveCallback();
}
void ConsumerImpl::handleClose(Result result, ResultCallback callback) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index fcdaed1..62ac9da 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -42,6 +42,7 @@
#include <lib/BrokerConsumerStatsImpl.h>
#include <lib/stats/ConsumerStatsImpl.h>
#include <lib/stats/ConsumerStatsDisabled.h>
+#include <queue>
using namespace pulsar;
@@ -92,6 +93,7 @@ class ConsumerImpl : public ConsumerImplBase,
virtual const std::string& getTopic() const;
virtual Result receive(Message& msg);
virtual Result receive(Message& msg, int timeout);
+ virtual void receiveAsync(ReceiveCallback& callback);
Result fetchSingleMessageFromBroker(Message& msg);
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
@@ -140,6 +142,8 @@ class ConsumerImpl : public ConsumerImplBase,
Result receiveHelper(Message& msg);
Result receiveHelper(Message& msg, int timeout);
void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
+ void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
+ void failPendingReceiveCallback();
Optional<MessageId> clearReceiveQueue();
@@ -156,6 +160,7 @@ class ConsumerImpl : public ConsumerImplBase,
Optional<MessageId> lastDequedMessage_;
UnboundedBlockingQueue<Message> incomingMessages_;
+ std::queue<ReceiveCallback> pendingReceives_;
int availablePermits_;
uint64_t consumerId_;
std::string consumerName_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index 11f2fc6..0b1b6fb 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -35,6 +35,7 @@ class ConsumerImplBase {
virtual const std::string& getTopic() const = 0;
virtual Result receive(Message& msg) = 0;
virtual Result receive(Message& msg, int timeout) = 0;
+ virtual void receiveAsync(ReceiveCallback& callback) = 0;
virtual void unsubscribeAsync(ResultCallback callback) = 0;
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0;
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index 4ad0800..f1a23fc 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -91,6 +91,7 @@ class HandlerBase {
const std::string topic_;
ClientConnectionWeakPtr connection_;
boost::mutex mutex_;
+ boost::mutex pendingReceiveMutex_;
ptime creationTimestamp_;
const TimeDuration operationTimeut_;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 69c3cc0..329442a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -34,6 +34,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
messages_(1000),
listenerExecutor_(client->getListenerExecutorProvider()->get()),
messageListener_(conf.getMessageListener()),
+ pendingReceives_(),
namespaceName_(topicName ? topicName->getNamespaceName() : boost::shared_ptr<NamespaceName>()),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(boost::make_shared<std::atomic<int>>(0)),
@@ -385,6 +386,9 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
consumerPtr->closeAsync(boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose,
shared_from_this(), _1, topicPartitionName, callback));
}
+
+ // fail pending recieve
+ failPendingReceiveCallback();
}
void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string& topicPartitionName,
@@ -429,11 +433,23 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
<< " message:" << msg.getDataAsString());
const std::string& topicPartitionName = consumer.getTopic();
msg.impl_->setTopicName(topicPartitionName);
- messages_.push(msg);
- if (messageListener_) {
- listenerExecutor_->postWork(
- boost::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
+ Lock lock(pendingReceiveMutex_);
+ if (!pendingReceives_.empty()) {
+ ReceiveCallback callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ lock.unlock();
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg));
+ } else {
+ if (messages_.full()) {
+ lock.unlock();
+ }
+ messages_.push(msg);
+ if (messageListener_) {
+ listenerExecutor_->postWork(
+ boost::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
+ }
}
}
@@ -489,6 +505,38 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
}
}
+void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+ Message msg;
+
+ // fail the callback if consumer is closing or closed
+ Lock stateLock(mutex_);
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed, msg);
+ return;
+ }
+ stateLock.unlock();
+
+ Lock lock(pendingReceiveMutex_);
+ if (messages_.pop(msg, milliseconds(0))) {
+ lock.unlock();
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ callback(ResultOk, msg);
+ } else {
+ pendingReceives_.push(callback);
+ }
+}
+
+void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
+ Message msg;
+ Lock lock(pendingReceiveMutex_);
+ while (!pendingReceives_.empty()) {
+ ReceiveCallback callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg));
+ }
+ lock.unlock();
+}
+
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 3b1d985..ddea13a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -22,6 +22,7 @@
#include "ClientImpl.h"
#include "BlockingQueue.h"
#include <vector>
+#include <queue>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include "boost/enable_shared_from_this.hpp"
@@ -58,6 +59,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
virtual const std::string& getName() const;
virtual Result receive(Message& msg);
virtual Result receive(Message& msg, int timeout);
+ virtual void receiveAsync(ReceiveCallback& callback);
virtual void unsubscribeAsync(ResultCallback callback);
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
@@ -91,6 +93,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
ConsumerMap consumers_;
std::map<std::string, int> topicsPartitions_;
boost::mutex mutex_;
+ boost::mutex pendingReceiveMutex_;
MultiTopicsConsumerState state_;
boost::shared_ptr<std::atomic<int>> numberTopicPartitions_;
LookupServicePtr lookupServicePtr_;
@@ -100,6 +103,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
+ std::queue<ReceiveCallback> pendingReceives_;
/* methods */
void setState(MultiTopicsConsumerState state);
@@ -112,6 +116,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void messageReceived(Consumer consumer, const Message& msg);
void internalListener(Consumer consumer);
void receiveMessages();
+ void failPendingReceiveCallback();
void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
boost::shared_ptr<std::atomic<int>> topicsNeedCreate);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e3f93c7..c0edf11 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -37,6 +37,7 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
messages_(1000),
listenerExecutor_(client->getListenerExecutorProvider()->get()),
messageListener_(conf.getMessageListener()),
+ pendingReceives_(),
topic_(topicName->toString()) {
std::stringstream consumerStrStream;
consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << ","
@@ -95,6 +96,27 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
}
}
+void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+ Message msg;
+
+ // fail the callback if consumer is closing or closed
+ Lock stateLock(mutex_);
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed, msg);
+ return;
+ }
+ stateLock.unlock();
+
+ Lock lock(pendingReceiveMutex_);
+ if (messages_.pop(msg, milliseconds(0))) {
+ lock.unlock();
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ callback(ResultOk, msg);
+ } else {
+ pendingReceives_.push(callback);
+ }
+}
+
void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) {
LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing");
// change state to Closing, so that no Ready state operation is permitted during unsubscribe
@@ -283,6 +305,9 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
}
}
}
+
+ // fail pending recieve
+ failPendingReceiveCallback();
}
void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) {
@@ -316,13 +341,38 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
const std::string& topicPartitionName = consumer.getTopic();
msg.impl_->setTopicName(topicPartitionName);
- messages_.push(msg);
- if (messageListener_) {
- listenerExecutor_->postWork(
- boost::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
+ // messages_ is a blocking queue: if queue is already full then no need of lock as receiveAsync already
+ // gets available-msg and no need to put request in pendingReceives_
+ Lock lock(pendingReceiveMutex_);
+ if (!pendingReceives_.empty()) {
+ ReceiveCallback callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ lock.unlock();
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg));
+ } else {
+ if (messages_.full()) {
+ lock.unlock();
+ }
+ messages_.push(msg);
+ if (messageListener_) {
+ listenerExecutor_->postWork(
+ boost::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
+ }
}
}
+void PartitionedConsumerImpl::failPendingReceiveCallback() {
+ Message msg;
+ Lock lock(pendingReceiveMutex_);
+ while (!pendingReceives_.empty()) {
+ ReceiveCallback callback = pendingReceives_.front();
+ pendingReceives_.pop();
+ listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg));
+ }
+ lock.unlock();
+}
+
void PartitionedConsumerImpl::internalListener(Consumer consumer) {
Message m;
messages_.pop(m);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 606c007..c20a7df 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -21,6 +21,7 @@
#include "ConsumerImpl.h"
#include "ClientImpl.h"
#include <vector>
+#include <queue>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include "boost/enable_shared_from_this.hpp"
@@ -52,6 +53,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
virtual const std::string& getTopic() const;
virtual Result receive(Message& msg);
virtual Result receive(Message& msg, int timeout);
+ virtual void receiveAsync(ReceiveCallback& callback);
virtual void unsubscribeAsync(ResultCallback callback);
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
@@ -80,6 +82,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
typedef std::vector<ConsumerImplPtr> ConsumerList;
ConsumerList consumers_;
boost::mutex mutex_;
+ boost::mutex pendingReceiveMutex_;
PartitionedConsumerState state_;
unsigned int unsubscribedSoFar_;
BlockingQueue<Message> messages_;
@@ -99,8 +102,10 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void messageReceived(Consumer consumer, const Message& msg);
void internalListener(Consumer consumer);
void receiveMessages();
+ void failPendingReceiveCallback();
Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+ std::queue<ReceiveCallback> pendingReceives_;
};
typedef boost::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
typedef boost::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index a594a0a..9a34fbe 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -25,6 +25,7 @@
#include <lib/Latch.h>
#include <sstream>
#include "boost/date_time/posix_time/posix_time.hpp"
+#include "boost/enable_shared_from_this.hpp"
#include "CustomRoutingPolicy.h"
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
@@ -59,19 +60,41 @@ static void messageListenerFunctionWithoutAck(Consumer consumer, const Message&
latch.countdown();
}
-static void sendCallBack(Result r, const Message& msg, std::string prefix) {
+static void sendCallBack(Result r, const Message& msg, std::string prefix, int* count) {
+ static boost::mutex sendMutex_;
+ sendMutex_.lock();
ASSERT_EQ(r, ResultOk);
- std::string messageContent = prefix + boost::lexical_cast<std::string>(globalTestBatchMessagesCounter++);
+ std::string messageContent = prefix + boost::lexical_cast<std::string>(*count);
ASSERT_EQ(messageContent, msg.getDataAsString());
LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
+ *count += 1;
+ sendMutex_.unlock();
+}
+
+static void receiveCallBack(Result r, const Message& msg, std::string& messageContent, bool checkContent,
+ bool* isFailed, int* count) {
+ static boost::mutex receiveMutex_;
+ receiveMutex_.lock();
+
+ if (r == ResultOk) {
+ LOG_DEBUG("received msg " << msg.getDataAsString() << " expected: " << messageContent
+ << " count =" << *count);
+ if (checkContent) {
+ ASSERT_EQ(messageContent, msg.getDataAsString());
+ }
+ *count += 1;
+ } else {
+ *isFailed = true;
+ }
+ receiveMutex_.unlock();
}
static void sendCallBack(Result r, const Message& msg, std::string prefix, double percentage,
- uint64_t delayInMicros) {
+ uint64_t delayInMicros, int* count) {
if ((rand() % 100) <= percentage) {
usleep(delayInMicros);
}
- sendCallBack(r, msg, prefix);
+ sendCallBack(r, msg, prefix, count);
}
class EncKeyReader : public CryptoKeyReader {
@@ -158,13 +181,14 @@ TEST(BasicEndToEndTest, testBatchMessages) {
// Send Asynchronously
std::string prefix = "msg-batch-";
+ int msgCount = 0;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
Message msg = MessageBuilder()
.setContent(messageContent)
.setProperty("msgIndex", boost::lexical_cast<std::string>(i))
.build();
- producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
LOG_DEBUG("sending message " << messageContent);
}
@@ -172,15 +196,16 @@ TEST(BasicEndToEndTest, testBatchMessages) {
int i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i);
- LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
- << receivedMsg.getMessageId() << "]");
+ LOG_INFO("Received Message with [ content - "
+ << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]");
+ LOG_INFO("msg-index " << receivedMsg.getProperty("msgIndex") << ", expected "
+ << boost::lexical_cast<std::string>(i));
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
// Number of messages produced
- ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
- globalTestBatchMessagesCounter = 0;
+ ASSERT_EQ(msgCount, numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
}
@@ -983,13 +1008,14 @@ TEST(BasicEndToEndTest, testStatsLatencies) {
// Send Asynchronously
std::string prefix = "msg-stats-";
+ int count = 0;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
Message msg = MessageBuilder()
.setContent(messageContent)
.setProperty("msgIndex", boost::lexical_cast<std::string>(i))
.build();
- producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 2 * 1e3));
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 2 * 1e3, &count));
LOG_DEBUG("sending message " << messageContent);
}
@@ -2004,13 +2030,14 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
// Send Asynchronously of half the messages
std::string prefix = "msg-batch-async";
+ int msgCount = 0;
for (int i = 0; i < numOfMessages / 2; i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
Message msg = MessageBuilder()
.setContent(messageContent)
.setProperty("msgIndex", boost::lexical_cast<std::string>(i))
.build();
- producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
LOG_DEBUG("async sending message " << messageContent);
}
LOG_INFO("sending first half messages in async, should timeout to receive");
@@ -2026,7 +2053,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
.setContent(messageContent)
.setProperty("msgIndex", boost::lexical_cast<std::string>(i))
.build();
- producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
LOG_DEBUG("async sending message " << messageContent);
}
LOG_INFO("sending the other half messages in async, should able to receive");
@@ -2039,7 +2066,8 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
while (consumer.receive(receivedMsg, 1000) == ResultOk) {
std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i);
LOG_INFO("Received Message with [ content - "
- << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]");
+ << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"
+ << "property = " << receivedMsg.getProperty("msgIndex"));
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
@@ -2234,13 +2262,14 @@ TEST(BasicEndToEndTest, testFlushInProducer) {
// Send Asynchronously of half the messages
std::string prefix = "msg-batch-async";
+ int msgCount = 0;
for (int i = 0; i < numOfMessages / 2; i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
Message msg = MessageBuilder()
.setContent(messageContent)
.setProperty("msgIndex", boost::lexical_cast<std::string>(i))
.build();
- producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
LOG_DEBUG("async sending message " << messageContent);
}
LOG_INFO("sending half of messages in async, should timeout to receive");
@@ -2265,7 +2294,7 @@ TEST(BasicEndToEndTest, testFlushInProducer) {
.setContent(messageContent)
.setProperty("msgIndex", boost::lexical_cast<std::string>(i))
.build();
- producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
LOG_DEBUG("async sending message " << messageContent);
}
LOG_INFO(
@@ -2385,3 +2414,260 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
producer.close();
client.shutdown();
}
+
+TEST(BasicEndToEndTest, testReceiveAsync) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string topicName = "persistent://public/default/receiveAsync";
+ std::string subName = "my-sub-name";
+ Producer producer;
+
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+ std::string temp = producer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ temp = consumer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+ std::string content = "msg-1-content";
+ int count = 0;
+ int totalMsgs = 5;
+ bool isFailed = false;
+ for (int i = 0; i < totalMsgs; i++) {
+ consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, true, &isFailed, &count));
+ }
+ // Send synchronously
+ for (int i = 0; i < totalMsgs; i++) {
+ Message msg = MessageBuilder().setContent(content).build();
+ result = producer.send(msg);
+ ASSERT_EQ(ResultOk, result);
+ }
+
+ // check strategically
+ for (int i = 0; i < 3; i++) {
+ if (count == totalMsgs) {
+ break;
+ }
+ usleep(1 * 1000 * 1000);
+ }
+ ASSERT_FALSE(isFailed);
+ ASSERT_EQ(count, totalMsgs);
+ client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testPartitionedReceiveAsync) {
+ Client client(lookupUrl);
+ std::string topicName = "persistent://public/default/receiveAsync-partition";
+
+ // call admin api to make it partitioned
+ std::string url = adminUrl + "admin/v2/persistent/public/default/receiveAsync-partition/partitions";
+ int res = makePutRequest(url, "3");
+
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, "subscription-A", consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ int totalMsgs = 10;
+ std::string content;
+ int count = 0;
+ bool isFailed = false;
+ for (int i = 0; i < totalMsgs; i++) {
+ consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count));
+ }
+
+ for (int i = 0; i < totalMsgs; i++) {
+ boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time());
+ long nanoSeconds = t.time_of_day().total_nanoseconds();
+ std::stringstream ss;
+ ss << nanoSeconds;
+ Message msg = MessageBuilder().setContent(ss.str()).setPartitionKey(ss.str()).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ LOG_DEBUG("Message Timestamp is " << msg.getPublishTimestamp());
+ LOG_DEBUG("Message is " << msg);
+ }
+
+ // check strategically
+ for (int i = 0; i < 3; i++) {
+ if (count == totalMsgs) {
+ break;
+ }
+ usleep(1 * 1000 * 1000);
+ }
+ ASSERT_FALSE(isFailed);
+ ASSERT_EQ(count, totalMsgs);
+ client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testBatchMessagesReceiveAsync) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string topicName = "persistent://public/default/receiveAsync-batch";
+ std::string subName = "subscription-name";
+ Producer producer;
+
+ // Enable batching on producer side
+ int batchSize = 2;
+ int numOfMessages = 100;
+
+ ProducerConfiguration conf;
+ conf.setCompressionType(CompressionLZ4);
+ conf.setBatchingMaxMessages(batchSize);
+ conf.setBatchingEnabled(true);
+ conf.setBlockIfQueueFull(true);
+
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ // handling dangling subscriptions
+ consumer.unsubscribe();
+ client.subscribe(topicName, subName, consumer);
+
+ std::string temp = producer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ temp = consumer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+ std::string content;
+ int count = 0;
+ bool isFailed = false;
+ for (int i = 0; i < numOfMessages; i++) {
+ consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count));
+ }
+
+ // Send Asynchronously
+ std::string prefix = "msg-batch-";
+ int msgCount = 0;
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
+ .build();
+ producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+ LOG_DEBUG("sending message " << messageContent);
+ }
+
+ // check strategically
+ for (int i = 0; i < 3; i++) {
+ if (count == numOfMessages) {
+ break;
+ }
+ usleep(1 * 1000 * 1000);
+ }
+ ASSERT_FALSE(isFailed);
+ ASSERT_EQ(count, numOfMessages);
+}
+
+TEST(BasicEndToEndTest, testReceiveAsyncFailedConsumer) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ std::string topicName = "persistent://public/default/receiveAsync-failed";
+ std::string subName = "my-sub-name";
+
+ Consumer consumer;
+ Promise<Result, Consumer> consumerPromise;
+ client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+ Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+ Result result = consumerFuture.get(consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ bool isFailedOnConsumerClosing = false;
+ std::string content;
+ int closingCunt = 0;
+ // callback should immediately fail
+ consumer.receiveAsync(
+ boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt));
+
+ // close consumer
+ consumer.close();
+ bool isFailedOnConsumerClosed = false;
+ int count = 0;
+ // callback should immediately fail
+ consumer.receiveAsync(
+ boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count));
+
+ // check strategically
+ for (int i = 0; i < 3; i++) {
+ if (isFailedOnConsumerClosing && isFailedOnConsumerClosed) {
+ break;
+ }
+ usleep(1 * 1000 * 1000);
+ }
+
+ ASSERT_TRUE(isFailedOnConsumerClosing);
+ ASSERT_TRUE(isFailedOnConsumerClosed);
+ ASSERT_EQ(count, 0);
+
+ client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
+ Client client(lookupUrl);
+ std::string topicName = "persistent://public/default/receiveAsync-fail-partition";
+
+ // call admin api to make it partitioned
+ std::string url = adminUrl + "admin/v2/persistent/public/default/receiveAsync-fail-partition/partitions";
+ int res = makePutRequest(url, "3");
+
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ Consumer consumer;
+ Result result = client.subscribe(topicName, "subscription-A", consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ bool isFailedOnConsumerClosing = false;
+ std::string content;
+ int closingCunt = 0;
+ // callback should immediately fail
+ consumer.receiveAsync(
+ boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt));
+ // close consumer
+ consumer.close();
+
+ int count = 0;
+ bool isFailedOnConsumerClosed = false;
+ consumer.receiveAsync(
+ boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count));
+
+ // check strategically
+ for (int i = 0; i < 3; i++) {
+ if (isFailedOnConsumerClosing && isFailedOnConsumerClosed) {
+ break;
+ }
+ usleep(1 * 1000 * 1000);
+ }
+
+ ASSERT_TRUE(isFailedOnConsumerClosing);
+ ASSERT_TRUE(isFailedOnConsumerClosed);
+ ASSERT_EQ(count, 0);
+ client.shutdown();
+}