You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/06/23 13:58:30 UTC

[pulsar] branch master updated: [client][c++] add getLastMessageIdAsync in Consumer (#16182)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c13d1c7c171 [client][c++] add getLastMessageIdAsync in Consumer (#16182)
c13d1c7c171 is described below

commit c13d1c7c17166750461913dc1395c53a90a84bc5
Author: komalatammal <10...@users.noreply.github.com>
AuthorDate: Thu Jun 23 09:58:22 2022 -0400

    [client][c++] add getLastMessageIdAsync in Consumer (#16182)
    
    ### Motivation
    
    Add getLastMessageId method to C++ Consumer.cc to address the missing part in this PR https://github.com/apache/pulsar/pull/15993
    
    ### Modifications
    
    Expose methods to get last message Id in consumer, the C++ client's Consumer class
---
 pulsar-client-cpp/include/pulsar/Consumer.h             | 11 +++++++++++
 .../include/pulsar/ConsumerConfiguration.h              |  1 +
 pulsar-client-cpp/include/pulsar/Reader.h               |  1 -
 pulsar-client-cpp/include/pulsar/ReaderConfiguration.h  |  1 +
 pulsar-client-cpp/lib/Consumer.cc                       | 17 +++++++++++++++++
 5 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index e82d2c07fbc..f1e180aea2d 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -390,6 +390,17 @@ class PULSAR_PUBLIC Consumer {
      */
     bool isConnected() const;
 
+    /**
+     * Asynchronously get an ID of the last available message or a message ID with -1 as an entryId if the
+     * topic is empty.
+     */
+    void getLastMessageIdAsync(GetLastMessageIdCallback callback);
+
+    /**
+     * Get an ID of the last available message or a message ID with -1 as an entryId if the topic is empty.
+     */
+    Result getLastMessageId(MessageId& messageId);
+
    private:
     ConsumerImplBasePtr impl_;
     explicit Consumer(ConsumerImplBasePtr);
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 0898b95736a..b326ca8fb31 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -40,6 +40,7 @@ class PulsarWrapper;
 /// Callback definition for non-data operation
 typedef std::function<void(Result result)> ResultCallback;
 typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
+typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
 
 /// Callback definition for MessageListener
 typedef std::function<void(Consumer consumer, const Message& msg)> MessageListener;
diff --git a/pulsar-client-cpp/include/pulsar/Reader.h b/pulsar-client-cpp/include/pulsar/Reader.h
index 04d6fb86c78..554788e8cd6 100644
--- a/pulsar-client-cpp/include/pulsar/Reader.h
+++ b/pulsar-client-cpp/include/pulsar/Reader.h
@@ -29,7 +29,6 @@ class PulsarFriend;
 class ReaderImpl;
 
 typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
-typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
 
 /**
  * A Reader can be used to scan through all the messages currently available in a topic.
diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 3d0af205f98..5b88553534a 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -35,6 +35,7 @@ class PulsarWrapper;
 
 /// Callback definition for non-data operation
 typedef std::function<void(Result result)> ResultCallback;
+typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
 
 /// Callback definition for MessageListener
 typedef std::function<void(Reader reader, const Message& msg)> ReaderListener;
diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc
index 3dcd3b54bcc..5d163629128 100644
--- a/pulsar-client-cpp/lib/Consumer.cc
+++ b/pulsar-client-cpp/lib/Consumer.cc
@@ -250,4 +250,21 @@ Result Consumer::seek(uint64_t timestamp) {
 
 bool Consumer::isConnected() const { return impl_ && impl_->isConnected(); }
 
+void Consumer::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+    if (!impl_) {
+        callback(ResultConsumerNotInitialized, MessageId());
+        return;
+    }
+    getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
+        callback(result, response.getLastMessageId());
+    });
+}
+
+Result Consumer::getLastMessageId(MessageId& messageId) {
+    Promise<Result, MessageId> promise;
+
+    getLastMessageIdAsync(WaitForCallbackValue<MessageId>(promise));
+    return promise.getFuture().get(messageId);
+}
+
 }  // namespace pulsar