You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gi...@git.apache.org on 2017/08/28 19:45:35 UTC

[GitHub] jai1 commented on a change in pull request #717: Reader API for C++ client

jai1 commented on a change in pull request #717: Reader API for C++ client
URL: https://github.com/apache/incubator-pulsar/pull/717#discussion_r135611949
 
 

 ##########
 File path: pulsar-client-cpp/lib/ConsumerImpl.cc
 ##########
 @@ -490,6 +526,29 @@ void ConsumerImpl::messageProcessed(Message& msg) {
     increaseAvailablePermits(currentCnx);
 }
 
+/**
+ * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
+ * not seen by the application
+ */
+Optional<BatchMessageId> ConsumerImpl::clearReceiveQueue() {
+    Message nextMessageInQueue;
+    if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
+        // There was at least one message pending in the queue
+        // We can safely cast to 'BatchMessageId' since all the messages queued will have that type of message id,
+        // irrespective of whether they were part of a batch or not.
+        const MessageId& nextMessageId = static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId());
+        BatchMessageId previousMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1);
+        return Optional<BatchMessageId>::of(previousMessageId);
+    } else if (lastDequedMessage_.is_present()) {
 
 Review comment:
   return (lastDequedMessage_.is_present() ? lastDequedMessage_ : startMessageId_)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services