You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/21 22:51:22 UTC

[pulsar] branch master updated: [cpp-client] add support of receiveAsync API (#3389)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02cddfd  [cpp-client] add support of receiveAsync API (#3389)
02cddfd is described below

commit 02cddfd8386860045378a16afa33820622f0759f
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Jan 21 14:51:17 2019 -0800

    [cpp-client] add support of receiveAsync API (#3389)
    
    ### Motivation
    
    In many cases, client requires receiveAsync() api in Consumer. This api is already available into java-client but doesn't exist into CPP-client.
    
    ### Modification
    
    Add support for receiveAsync() api in cpp-client consumer.
    
    This PR is rebased and reopened from #577
---
 pulsar-client-cpp/include/pulsar/Consumer.h        |  13 +
 .../include/pulsar/ConsumerConfiguration.h         |   1 +
 pulsar-client-cpp/lib/Consumer.cc                  |   9 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  87 +++++-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   5 +
 pulsar-client-cpp/lib/ConsumerImplBase.h           |   1 +
 pulsar-client-cpp/lib/HandlerBase.h                |   1 +
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  56 +++-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   5 +
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  58 +++-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |   5 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 316 ++++++++++++++++++++-
 12 files changed, 532 insertions(+), 25 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index 4486515..4f1d6d4 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -100,6 +100,19 @@ class Consumer {
     Result receive(Message& msg, int timeoutMs);
 
     /**
+     * Receive a single message
+     * <p>
+     * Retrieves a message when it will be available and completes callback with received message.
+     * </p>
+     * <p>
+     * receiveAsync() should be called subsequently once callback gets completed with received message.
+     * Else it creates <i> backlog of receive requests </i> in the application.
+     * </p>
+     * @param ReceiveCallback will be completed when message is available
+     */
+    void receiveAsync(ReceiveCallback callback);
+
+    /**
      * Acknowledge the reception of a single message.
      *
      * This method will block until an acknowledgement is sent to the broker. After
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 60ffef1..69c17e4 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -36,6 +36,7 @@ class PulsarWrapper;
 
 /// Callback definition for non-data operation
 typedef boost::function<void(Result result)> ResultCallback;
+typedef boost::function<void(Result, const Message& msg)> ReceiveCallback;
 
 /// Callback definition for MessageListener
 typedef boost::function<void(Consumer consumer, const Message& msg)> MessageListener;
diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc
index d89dd73..cbe44fe 100644
--- a/pulsar-client-cpp/lib/Consumer.cc
+++ b/pulsar-client-cpp/lib/Consumer.cc
@@ -73,6 +73,15 @@ Result Consumer::receive(Message& msg, int timeoutMs) {
     return impl_->receive(msg, timeoutMs);
 }
 
+void Consumer::receiveAsync(ReceiveCallback callback) {
+    if (!impl_) {
+        Message msg;
+        callback(ResultConsumerNotInitialized, msg);
+        return;
+    }
+    impl_->receiveAsync(callback);
+}
+
 Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); }
 
 Result Consumer::acknowledge(const MessageId& messageId) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 35a18db..4a24dfd 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -48,6 +48,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
       startMessageId_(startMessageId),
       // This is the initial capacity of the queue
       incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
+      pendingReceives_(),
       availablePermits_(conf.getReceiverQueueSize()),
       consumerId_(client->newConsumerId()),
       consumerName_(config_.getConsumerName()),
@@ -290,6 +291,22 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         Lock lock(mutex_);
         numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m);
     } else {
+        Lock lock(pendingReceiveMutex_);
+        // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
+        bool asyncReceivedWaiting = !pendingReceives_.empty();
+        ReceiveCallback callback;
+        if (asyncReceivedWaiting) {
+            callback = pendingReceives_.front();
+            pendingReceives_.pop();
+        }
+        lock.unlock();
+
+        if (asyncReceivedWaiting) {
+            listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                                    shared_from_this(), ResultOk, m, callback));
+            return;
+        }
+
         // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message`
         if (config_.getReceiverQueueSize() != 0 ||
             (config_.getReceiverQueueSize() == 0 && messageListener_)) {
@@ -316,6 +333,27 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
     }
 }
 
+void ConsumerImpl::failPendingReceiveCallback() {
+    Message msg;
+    Lock lock(pendingReceiveMutex_);
+    while (!pendingReceives_.empty()) {
+        ReceiveCallback callback = pendingReceives_.front();
+        pendingReceives_.pop();
+        listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                                shared_from_this(), ResultAlreadyClosed, msg, callback));
+    }
+    lock.unlock();
+}
+
+void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
+                                                 const ReceiveCallback& callback) {
+    if (result == ResultOk && config_.getReceiverQueueSize() != 0) {
+        messageProcessed(msg);
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    }
+    callback(result, msg);
+}
+
 // Zero Queue size is not supported with Batch Messages
 uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
                                                           Message& batchedMessage) {
@@ -345,8 +383,19 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
             }
         }
 
-        // Regular path, append individual message to incoming messages queue
-        incomingMessages_.push(msg);
+        //
+        Lock lock(pendingReceiveMutex_);
+        if (!pendingReceives_.empty()) {
+            ReceiveCallback callback = pendingReceives_.front();
+            pendingReceives_.pop();
+            lock.unlock();
+            listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                                    shared_from_this(), ResultOk, msg, callback));
+        } else {
+            // Regular path, append individual message to incoming messages queue
+            incomingMessages_.push(msg);
+            lock.unlock();
+        }
     }
 
     if (skippedMessages > 0) {
@@ -509,6 +558,37 @@ Result ConsumerImpl::receive(Message& msg) {
     return res;
 }
 
+void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+    Message msg;
+
+    // fail the callback if consumer is closing or closed
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, msg);
+        return;
+    }
+    stateLock.unlock();
+
+    Lock lock(pendingReceiveMutex_);
+    if (incomingMessages_.pop(msg, milliseconds(0))) {
+        lock.unlock();
+        messageProcessed(msg);
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        callback(ResultOk, msg);
+    } else {
+        pendingReceives_.push(callback);
+        lock.unlock();
+
+        if (config_.getReceiverQueueSize() == 0) {
+            ClientConnectionPtr currentCnx = getCnx().lock();
+            if (currentCnx) {
+                LOG_DEBUG(getName() << "Send more permits: " << 1);
+                receiveMessages(currentCnx, 1);
+            }
+        }
+    }
+}
+
 Result ConsumerImpl::receiveHelper(Message& msg) {
     {
         Lock lock(mutex_);
@@ -747,6 +827,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     if (!callback.empty()) {
         future.addListener(boost::bind(&ConsumerImpl::handleClose, shared_from_this(), _1, callback));
     }
+
+    // fail pendingReceive callback
+    failPendingReceiveCallback();
 }
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index fcdaed1..62ac9da 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -42,6 +42,7 @@
 #include <lib/BrokerConsumerStatsImpl.h>
 #include <lib/stats/ConsumerStatsImpl.h>
 #include <lib/stats/ConsumerStatsDisabled.h>
+#include <queue>
 
 using namespace pulsar;
 
@@ -92,6 +93,7 @@ class ConsumerImpl : public ConsumerImplBase,
     virtual const std::string& getTopic() const;
     virtual Result receive(Message& msg);
     virtual Result receive(Message& msg, int timeout);
+    virtual void receiveAsync(ReceiveCallback& callback);
     Result fetchSingleMessageFromBroker(Message& msg);
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
     virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
@@ -140,6 +142,8 @@ class ConsumerImpl : public ConsumerImplBase,
     Result receiveHelper(Message& msg);
     Result receiveHelper(Message& msg, int timeout);
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
+    void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
+    void failPendingReceiveCallback();
 
     Optional<MessageId> clearReceiveQueue();
 
@@ -156,6 +160,7 @@ class ConsumerImpl : public ConsumerImplBase,
 
     Optional<MessageId> lastDequedMessage_;
     UnboundedBlockingQueue<Message> incomingMessages_;
+    std::queue<ReceiveCallback> pendingReceives_;
     int availablePermits_;
     uint64_t consumerId_;
     std::string consumerName_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index 11f2fc6..0b1b6fb 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -35,6 +35,7 @@ class ConsumerImplBase {
     virtual const std::string& getTopic() const = 0;
     virtual Result receive(Message& msg) = 0;
     virtual Result receive(Message& msg, int timeout) = 0;
+    virtual void receiveAsync(ReceiveCallback& callback) = 0;
     virtual void unsubscribeAsync(ResultCallback callback) = 0;
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0;
     virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0;
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index 4ad0800..f1a23fc 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -91,6 +91,7 @@ class HandlerBase {
     const std::string topic_;
     ClientConnectionWeakPtr connection_;
     boost::mutex mutex_;
+    boost::mutex pendingReceiveMutex_;
     ptime creationTimestamp_;
 
     const TimeDuration operationTimeut_;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 69c3cc0..329442a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -34,6 +34,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
       messages_(1000),
       listenerExecutor_(client->getListenerExecutorProvider()->get()),
       messageListener_(conf.getMessageListener()),
+      pendingReceives_(),
       namespaceName_(topicName ? topicName->getNamespaceName() : boost::shared_ptr<NamespaceName>()),
       lookupServicePtr_(lookupServicePtr),
       numberTopicPartitions_(boost::make_shared<std::atomic<int>>(0)),
@@ -385,6 +386,9 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
         consumerPtr->closeAsync(boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose,
                                             shared_from_this(), _1, topicPartitionName, callback));
     }
+
+    // fail pending recieve
+    failPendingReceiveCallback();
 }
 
 void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string& topicPartitionName,
@@ -429,11 +433,23 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
                                                           << " message:" << msg.getDataAsString());
     const std::string& topicPartitionName = consumer.getTopic();
     msg.impl_->setTopicName(topicPartitionName);
-    messages_.push(msg);
 
-    if (messageListener_) {
-        listenerExecutor_->postWork(
-            boost::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
+    Lock lock(pendingReceiveMutex_);
+    if (!pendingReceives_.empty()) {
+        ReceiveCallback callback = pendingReceives_.front();
+        pendingReceives_.pop();
+        lock.unlock();
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg));
+    } else {
+        if (messages_.full()) {
+            lock.unlock();
+        }
+        messages_.push(msg);
+        if (messageListener_) {
+            listenerExecutor_->postWork(
+                boost::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer));
+        }
     }
 }
 
@@ -489,6 +505,38 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
     }
 }
 
+void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+    Message msg;
+
+    // fail the callback if consumer is closing or closed
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, msg);
+        return;
+    }
+    stateLock.unlock();
+
+    Lock lock(pendingReceiveMutex_);
+    if (messages_.pop(msg, milliseconds(0))) {
+        lock.unlock();
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        callback(ResultOk, msg);
+    } else {
+        pendingReceives_.push(callback);
+    }
+}
+
+void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
+    Message msg;
+    Lock lock(pendingReceiveMutex_);
+    while (!pendingReceives_.empty()) {
+        ReceiveCallback callback = pendingReceives_.front();
+        pendingReceives_.pop();
+        listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg));
+    }
+    lock.unlock();
+}
+
 void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
     if (state_ != Ready) {
         callback(ResultAlreadyClosed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 3b1d985..ddea13a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -22,6 +22,7 @@
 #include "ClientImpl.h"
 #include "BlockingQueue.h"
 #include <vector>
+#include <queue>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 #include "boost/enable_shared_from_this.hpp"
@@ -58,6 +59,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     virtual const std::string& getName() const;
     virtual Result receive(Message& msg);
     virtual Result receive(Message& msg, int timeout);
+    virtual void receiveAsync(ReceiveCallback& callback);
     virtual void unsubscribeAsync(ResultCallback callback);
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
     virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
@@ -91,6 +93,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     ConsumerMap consumers_;
     std::map<std::string, int> topicsPartitions_;
     boost::mutex mutex_;
+    boost::mutex pendingReceiveMutex_;
     MultiTopicsConsumerState state_;
     boost::shared_ptr<std::atomic<int>> numberTopicPartitions_;
     LookupServicePtr lookupServicePtr_;
@@ -100,6 +103,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
     const std::vector<std::string>& topics_;
+    std::queue<ReceiveCallback> pendingReceives_;
 
     /* methods */
     void setState(MultiTopicsConsumerState state);
@@ -112,6 +116,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void messageReceived(Consumer consumer, const Message& msg);
     void internalListener(Consumer consumer);
     void receiveMessages();
+    void failPendingReceiveCallback();
 
     void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
                                   boost::shared_ptr<std::atomic<int>> topicsNeedCreate);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e3f93c7..c0edf11 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -37,6 +37,7 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
       messages_(1000),
       listenerExecutor_(client->getListenerExecutorProvider()->get()),
       messageListener_(conf.getMessageListener()),
+      pendingReceives_(),
       topic_(topicName->toString()) {
     std::stringstream consumerStrStream;
     consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << ","
@@ -95,6 +96,27 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
     }
 }
 
+void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+    Message msg;
+
+    // fail the callback if consumer is closing or closed
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, msg);
+        return;
+    }
+    stateLock.unlock();
+
+    Lock lock(pendingReceiveMutex_);
+    if (messages_.pop(msg, milliseconds(0))) {
+        lock.unlock();
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        callback(ResultOk, msg);
+    } else {
+        pendingReceives_.push(callback);
+    }
+}
+
 void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing");
     // change state to Closing, so that no Ready state operation is permitted during unsubscribe
@@ -283,6 +305,9 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
             }
         }
     }
+
+    // fail pending recieve
+    failPendingReceiveCallback();
 }
 
 void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) {
@@ -316,13 +341,38 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
     LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
     const std::string& topicPartitionName = consumer.getTopic();
     msg.impl_->setTopicName(topicPartitionName);
-    messages_.push(msg);
-    if (messageListener_) {
-        listenerExecutor_->postWork(
-            boost::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
+    // messages_ is a blocking queue: if queue is already full then no need of lock as receiveAsync already
+    // gets available-msg and no need to put request in pendingReceives_
+    Lock lock(pendingReceiveMutex_);
+    if (!pendingReceives_.empty()) {
+        ReceiveCallback callback = pendingReceives_.front();
+        pendingReceives_.pop();
+        lock.unlock();
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg));
+    } else {
+        if (messages_.full()) {
+            lock.unlock();
+        }
+        messages_.push(msg);
+        if (messageListener_) {
+            listenerExecutor_->postWork(
+                boost::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
+        }
     }
 }
 
+void PartitionedConsumerImpl::failPendingReceiveCallback() {
+    Message msg;
+    Lock lock(pendingReceiveMutex_);
+    while (!pendingReceives_.empty()) {
+        ReceiveCallback callback = pendingReceives_.front();
+        pendingReceives_.pop();
+        listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg));
+    }
+    lock.unlock();
+}
+
 void PartitionedConsumerImpl::internalListener(Consumer consumer) {
     Message m;
     messages_.pop(m);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 606c007..c20a7df 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -21,6 +21,7 @@
 #include "ConsumerImpl.h"
 #include "ClientImpl.h"
 #include <vector>
+#include <queue>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 #include "boost/enable_shared_from_this.hpp"
@@ -52,6 +53,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     virtual const std::string& getTopic() const;
     virtual Result receive(Message& msg);
     virtual Result receive(Message& msg, int timeout);
+    virtual void receiveAsync(ReceiveCallback& callback);
     virtual void unsubscribeAsync(ResultCallback callback);
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
     virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
@@ -80,6 +82,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     typedef std::vector<ConsumerImplPtr> ConsumerList;
     ConsumerList consumers_;
     boost::mutex mutex_;
+    boost::mutex pendingReceiveMutex_;
     PartitionedConsumerState state_;
     unsigned int unsubscribedSoFar_;
     BlockingQueue<Message> messages_;
@@ -99,8 +102,10 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     void messageReceived(Consumer consumer, const Message& msg);
     void internalListener(Consumer consumer);
     void receiveMessages();
+    void failPendingReceiveCallback();
     Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+    std::queue<ReceiveCallback> pendingReceives_;
 };
 typedef boost::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
 typedef boost::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index a594a0a..9a34fbe 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -25,6 +25,7 @@
 #include <lib/Latch.h>
 #include <sstream>
 #include "boost/date_time/posix_time/posix_time.hpp"
+#include "boost/enable_shared_from_this.hpp"
 #include "CustomRoutingPolicy.h"
 #include <boost/thread.hpp>
 #include <boost/thread/mutex.hpp>
@@ -59,19 +60,41 @@ static void messageListenerFunctionWithoutAck(Consumer consumer, const Message&
     latch.countdown();
 }
 
-static void sendCallBack(Result r, const Message& msg, std::string prefix) {
+static void sendCallBack(Result r, const Message& msg, std::string prefix, int* count) {
+    static boost::mutex sendMutex_;
+    sendMutex_.lock();
     ASSERT_EQ(r, ResultOk);
-    std::string messageContent = prefix + boost::lexical_cast<std::string>(globalTestBatchMessagesCounter++);
+    std::string messageContent = prefix + boost::lexical_cast<std::string>(*count);
     ASSERT_EQ(messageContent, msg.getDataAsString());
     LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
+    *count += 1;
+    sendMutex_.unlock();
+}
+
+static void receiveCallBack(Result r, const Message& msg, std::string& messageContent, bool checkContent,
+                            bool* isFailed, int* count) {
+    static boost::mutex receiveMutex_;
+    receiveMutex_.lock();
+
+    if (r == ResultOk) {
+        LOG_DEBUG("received msg " << msg.getDataAsString() << " expected: " << messageContent
+                                  << " count =" << *count);
+        if (checkContent) {
+            ASSERT_EQ(messageContent, msg.getDataAsString());
+        }
+        *count += 1;
+    } else {
+        *isFailed = true;
+    }
+    receiveMutex_.unlock();
 }
 
 static void sendCallBack(Result r, const Message& msg, std::string prefix, double percentage,
-                         uint64_t delayInMicros) {
+                         uint64_t delayInMicros, int* count) {
     if ((rand() % 100) <= percentage) {
         usleep(delayInMicros);
     }
-    sendCallBack(r, msg, prefix);
+    sendCallBack(r, msg, prefix, count);
 }
 
 class EncKeyReader : public CryptoKeyReader {
@@ -158,13 +181,14 @@ TEST(BasicEndToEndTest, testBatchMessages) {
 
     // Send Asynchronously
     std::string prefix = "msg-batch-";
+    int msgCount = 0;
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
         Message msg = MessageBuilder()
                           .setContent(messageContent)
                           .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
                           .build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -172,15 +196,16 @@ TEST(BasicEndToEndTest, testBatchMessages) {
     int i = 0;
     while (consumer.receive(receivedMsg, 5000) == ResultOk) {
         std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i);
-        LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
-                                                       << receivedMsg.getMessageId() << "]");
+        LOG_INFO("Received Message with [ content - "
+                 << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]");
+        LOG_INFO("msg-index " << receivedMsg.getProperty("msgIndex") << ", expected "
+                              << boost::lexical_cast<std::string>(i));
         ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++));
         ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
         ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
     }
     // Number of messages produced
-    ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages);
-    globalTestBatchMessagesCounter = 0;
+    ASSERT_EQ(msgCount, numOfMessages);
     // Number of messages consumed
     ASSERT_EQ(i, numOfMessages);
 }
@@ -983,13 +1008,14 @@ TEST(BasicEndToEndTest, testStatsLatencies) {
 
     // Send Asynchronously
     std::string prefix = "msg-stats-";
+    int count = 0;
     for (int i = 0; i < numOfMessages; i++) {
         std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
         Message msg = MessageBuilder()
                           .setContent(messageContent)
                           .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
                           .build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 2 * 1e3));
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 2 * 1e3, &count));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -2004,13 +2030,14 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
 
     // Send Asynchronously of half the messages
     std::string prefix = "msg-batch-async";
+    int msgCount = 0;
     for (int i = 0; i < numOfMessages / 2; i++) {
         std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
         Message msg = MessageBuilder()
                           .setContent(messageContent)
                           .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
                           .build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO("sending first half messages in async, should timeout to receive");
@@ -2026,7 +2053,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
                           .setContent(messageContent)
                           .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
                           .build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO("sending the other half messages in async, should able to receive");
@@ -2039,7 +2066,8 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
     while (consumer.receive(receivedMsg, 1000) == ResultOk) {
         std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i);
         LOG_INFO("Received Message with [ content - "
-                 << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]");
+                 << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"
+                 << "property = " << receivedMsg.getProperty("msgIndex"));
         ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++));
         ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
         ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
@@ -2234,13 +2262,14 @@ TEST(BasicEndToEndTest, testFlushInProducer) {
 
     // Send Asynchronously of half the messages
     std::string prefix = "msg-batch-async";
+    int msgCount = 0;
     for (int i = 0; i < numOfMessages / 2; i++) {
         std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
         Message msg = MessageBuilder()
                           .setContent(messageContent)
                           .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
                           .build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO("sending half of messages in async, should timeout to receive");
@@ -2265,7 +2294,7 @@ TEST(BasicEndToEndTest, testFlushInProducer) {
                           .setContent(messageContent)
                           .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
                           .build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO(
@@ -2385,3 +2414,260 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
     producer.close();
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testReceiveAsync) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://public/default/receiveAsync";
+    std::string subName = "my-sub-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    std::string content = "msg-1-content";
+    int count = 0;
+    int totalMsgs = 5;
+    bool isFailed = false;
+    for (int i = 0; i < totalMsgs; i++) {
+        consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, true, &isFailed, &count));
+    }
+    // Send synchronously
+    for (int i = 0; i < totalMsgs; i++) {
+        Message msg = MessageBuilder().setContent(content).build();
+        result = producer.send(msg);
+        ASSERT_EQ(ResultOk, result);
+    }
+
+    // check strategically
+    for (int i = 0; i < 3; i++) {
+        if (count == totalMsgs) {
+            break;
+        }
+        usleep(1 * 1000 * 1000);
+    }
+    ASSERT_FALSE(isFailed);
+    ASSERT_EQ(count, totalMsgs);
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testPartitionedReceiveAsync) {
+    Client client(lookupUrl);
+    std::string topicName = "persistent://public/default/receiveAsync-partition";
+
+    // call admin api to make it partitioned
+    std::string url = adminUrl + "admin/v2/persistent/public/default/receiveAsync-partition/partitions";
+    int res = makePutRequest(url, "3");
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, "subscription-A", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    int totalMsgs = 10;
+    std::string content;
+    int count = 0;
+    bool isFailed = false;
+    for (int i = 0; i < totalMsgs; i++) {
+        consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count));
+    }
+
+    for (int i = 0; i < totalMsgs; i++) {
+        boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time());
+        long nanoSeconds = t.time_of_day().total_nanoseconds();
+        std::stringstream ss;
+        ss << nanoSeconds;
+        Message msg = MessageBuilder().setContent(ss.str()).setPartitionKey(ss.str()).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+        LOG_DEBUG("Message Timestamp is " << msg.getPublishTimestamp());
+        LOG_DEBUG("Message is " << msg);
+    }
+
+    // check strategically
+    for (int i = 0; i < 3; i++) {
+        if (count == totalMsgs) {
+            break;
+        }
+        usleep(1 * 1000 * 1000);
+    }
+    ASSERT_FALSE(isFailed);
+    ASSERT_EQ(count, totalMsgs);
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testBatchMessagesReceiveAsync) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://public/default/receiveAsync-batch";
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    // Enable batching on producer side
+    int batchSize = 2;
+    int numOfMessages = 100;
+
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.setBatchingMaxMessages(batchSize);
+    conf.setBatchingEnabled(true);
+    conf.setBlockIfQueueFull(true);
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // handling dangling subscriptions
+    consumer.unsubscribe();
+    client.subscribe(topicName, subName, consumer);
+
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    std::string content;
+    int count = 0;
+    bool isFailed = false;
+    for (int i = 0; i < numOfMessages; i++) {
+        consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count));
+    }
+
+    // Send Asynchronously
+    std::string prefix = "msg-batch-";
+    int msgCount = 0;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+        LOG_DEBUG("sending message " << messageContent);
+    }
+
+    // check strategically
+    for (int i = 0; i < 3; i++) {
+        if (count == numOfMessages) {
+            break;
+        }
+        usleep(1 * 1000 * 1000);
+    }
+    ASSERT_FALSE(isFailed);
+    ASSERT_EQ(count, numOfMessages);
+}
+
+TEST(BasicEndToEndTest, testReceiveAsyncFailedConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://public/default/receiveAsync-failed";
+    std::string subName = "my-sub-name";
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    bool isFailedOnConsumerClosing = false;
+    std::string content;
+    int closingCunt = 0;
+    // callback should immediately fail
+    consumer.receiveAsync(
+        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt));
+
+    // close consumer
+    consumer.close();
+    bool isFailedOnConsumerClosed = false;
+    int count = 0;
+    // callback should immediately fail
+    consumer.receiveAsync(
+        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count));
+
+    // check strategically
+    for (int i = 0; i < 3; i++) {
+        if (isFailedOnConsumerClosing && isFailedOnConsumerClosed) {
+            break;
+        }
+        usleep(1 * 1000 * 1000);
+    }
+
+    ASSERT_TRUE(isFailedOnConsumerClosing);
+    ASSERT_TRUE(isFailedOnConsumerClosed);
+    ASSERT_EQ(count, 0);
+
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
+    Client client(lookupUrl);
+    std::string topicName = "persistent://public/default/receiveAsync-fail-partition";
+
+    // call admin api to make it partitioned
+    std::string url = adminUrl + "admin/v2/persistent/public/default/receiveAsync-fail-partition/partitions";
+    int res = makePutRequest(url, "3");
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Consumer consumer;
+    Result result = client.subscribe(topicName, "subscription-A", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    bool isFailedOnConsumerClosing = false;
+    std::string content;
+    int closingCunt = 0;
+    // callback should immediately fail
+    consumer.receiveAsync(
+        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt));
+    // close consumer
+    consumer.close();
+
+    int count = 0;
+    bool isFailedOnConsumerClosed = false;
+    consumer.receiveAsync(
+        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count));
+
+    // check strategically
+    for (int i = 0; i < 3; i++) {
+        if (isFailedOnConsumerClosing && isFailedOnConsumerClosed) {
+            break;
+        }
+        usleep(1 * 1000 * 1000);
+    }
+
+    ASSERT_TRUE(isFailedOnConsumerClosing);
+    ASSERT_TRUE(isFailedOnConsumerClosed);
+    ASSERT_EQ(count, 0);
+    client.shutdown();
+}