You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/02 06:27:07 UTC

[pulsar] branch branch-2.8 updated (328f30c -> 02b208c)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 328f30c  Revert "Fix Regression in Consumer Performance"
     new 60fe05c  [pulsar-client-cpp] Expose getLastMessageId in the Reader API (#11723)
     new 5293e2a  [C++] Fix hasMessageAvailable returns wrong value for last message (#13883)
     new 02b208c  [C++] Fix flaky tests about reference count (#14854)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pulsar-client-cpp/include/pulsar/Reader.h | 12 ++++++++
 pulsar-client-cpp/lib/ConsumerImpl.cc     |  4 +++
 pulsar-client-cpp/lib/Reader.cc           | 15 +++++++++
 pulsar-client-cpp/lib/ReaderImpl.cc       |  9 +++---
 pulsar-client-cpp/lib/ReaderImpl.h        |  5 ++-
 pulsar-client-cpp/tests/ClientTest.cc     | 26 ++++++++++++++--
 pulsar-client-cpp/tests/PulsarFriend.h    |  7 +++++
 pulsar-client-cpp/tests/ReaderTest.cc     | 51 +++----------------------------
 pulsar-client-cpp/tests/ReaderTest.h      | 32 -------------------
 9 files changed, 72 insertions(+), 89 deletions(-)
 delete mode 100644 pulsar-client-cpp/tests/ReaderTest.h

[pulsar] 03/03: [C++] Fix flaky tests about reference count (#14854)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 02b208cc581b3de13c72d2db1210893f3139cb57
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Apr 1 19:09:14 2022 +0800

    [C++] Fix flaky tests about reference count (#14854)
    
    Fixes #14848
    Fixes #14719
    
    ### Motivation
    
    #7793 introduced a `testReferenceLeak` to avoid cyclic referenece of the
    reader. However, it adds a unused field `readerImplWeakPtr_` only for
    tests. The access to this field is not thread safe that the write
    operation happens in `handleConsumerCreated` while the read operation
    can happen anywhere via the getter. So there is a little chance that
    `readerPtr` in `testReferenceLeak` doesn't point to the right object.
    
    In addition, we should only guarantee the reference count becomes 0
    after the producer, consumer or reader goes out of its scope. #14797
    adds a `ClientTest.testReferenceCount` but it's also flaky. It's caused
    by the shared pointer of `ProducerImpl` is published to another thread
    via `shared_from_this()` but the test has a strong expectation that the
    reference count is exactly 1.
    
    ### Modifications
    
    - Remove `readerImplWeakPtr_` from `ReaderImpl` and get the weak pointer
      from `Reader` directly by adding a method to `PulsarFriend`.
    - Add the check of reader's reference count to `testReferenceCount` and
      remove the redundant `testReferenceLeak`.
    - Instead of asserting the reference count of producer/consumer/reader
      is 1, just assume the it's greater than 0.
    
    (cherry picked from commit f84ff571df95f99efa4596e65324def1084fc11b)
---
 pulsar-client-cpp/lib/ReaderImpl.cc    |  5 ----
 pulsar-client-cpp/lib/ReaderImpl.h     |  5 +---
 pulsar-client-cpp/tests/ClientTest.cc  | 26 +++++++++++++++--
 pulsar-client-cpp/tests/PulsarFriend.h |  7 +++++
 pulsar-client-cpp/tests/ReaderTest.cc  | 51 +++-------------------------------
 pulsar-client-cpp/tests/ReaderTest.h   | 32 ---------------------
 6 files changed, 35 insertions(+), 91 deletions(-)

diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 0a7b321..9401c12 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -90,11 +90,8 @@ const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic();
 void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
     auto self = shared_from_this();
     readerCreatedCallback_(result, Reader(self));
-    readerImplWeakPtr_ = self;
 }
 
-ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; }
-
 Result ReaderImpl::readNext(Message& msg) {
     Result res = consumer_->receive(msg);
     acknowledgeIfNecessary(res, msg);
@@ -144,8 +141,6 @@ void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
     });
 }
 
-ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
-
 bool ReaderImpl::isConnected() const { return consumer_->isConnected(); }
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index a546ae8..6de6c02 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -53,7 +53,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
 
     Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();
 
-    ConsumerImplPtr getConsumer();
+    ConsumerImplBaseWeakPtr getConsumer() const noexcept { return consumer_; }
 
     void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
 
@@ -62,8 +62,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
 
     void getLastMessageIdAsync(GetLastMessageIdCallback callback);
 
-    ReaderImplWeakPtr getReaderImplWeakPtr();
-
     bool isConnected() const;
 
    private:
@@ -79,7 +77,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
     ConsumerImplPtr consumer_;
     ReaderCallback readerCreatedCallback_;
     ReaderListener readerListener_;
-    ReaderImplWeakPtr readerImplWeakPtr_;
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 920430d..1ba0164 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -24,6 +24,9 @@
 #include <future>
 #include <pulsar/Client.h>
 #include "../lib/checksum/ChecksumProvider.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
@@ -184,22 +187,39 @@ TEST(ClientTest, testReferenceCount) {
 
     auto &producers = PulsarFriend::getProducers(client);
     auto &consumers = PulsarFriend::getConsumers(client);
+    ReaderImplWeakPtr readerWeakPtr;
 
     {
         Producer producer;
         ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
         ASSERT_EQ(producers.size(), 1);
-        ASSERT_EQ(producers[0].use_count(), 1);
+        ASSERT_TRUE(producers[0].use_count() > 0);
+        LOG_INFO("Reference count of the producer: " << producers[0].use_count());
 
         Consumer consumer;
         ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
         ASSERT_EQ(consumers.size(), 1);
-        ASSERT_EQ(consumers[0].use_count(), 1);
+        ASSERT_TRUE(consumers[0].use_count() > 0);
+        LOG_INFO("Reference count of the consumer: " << consumers[0].use_count());
+
+        ReaderConfiguration readerConf;
+        Reader reader;
+        ASSERT_EQ(ResultOk,
+                  client.createReader(topic + "-reader", MessageId::earliest(), readerConf, reader));
+        ASSERT_EQ(consumers.size(), 2);
+        ASSERT_TRUE(consumers[1].use_count() > 0);
+        LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
+
+        readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
+        ASSERT_EQ(readerWeakPtr.use_count(), 1);
+        LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
     }
 
     ASSERT_EQ(producers.size(), 1);
     ASSERT_EQ(producers[0].use_count(), 0);
-    ASSERT_EQ(consumers.size(), 1);
+    ASSERT_EQ(consumers.size(), 2);
     ASSERT_EQ(consumers[0].use_count(), 0);
+    ASSERT_EQ(consumers[1].use_count(), 0);
+    ASSERT_EQ(readerWeakPtr.use_count(), 0);
     client.close();
 }
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index 74aa1f7..2d9b558 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -25,6 +25,7 @@
 #include "lib/ConsumerImpl.h"
 #include "lib/PartitionedConsumerImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
+#include "lib/ReaderImpl.h"
 
 using std::string;
 
@@ -79,6 +80,12 @@ class PulsarFriend {
         return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
     }
 
+    static ConsumerImplPtr getConsumer(Reader reader) {
+        return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
+    }
+
+    static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }
+
     static std::shared_ptr<PartitionedConsumerImpl> getPartitionedConsumerImplPtr(Consumer consumer) {
         return std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_);
     }
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index 8cd535c..bf15692 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -18,8 +18,8 @@
  */
 #include <pulsar/Client.h>
 #include <pulsar/Reader.h>
-#include "ReaderTest.h"
 #include "HttpHelper.h"
+#include "PulsarFriend.h"
 
 #include <gtest/gtest.h>
 
@@ -28,6 +28,7 @@
 
 #include <lib/Latch.h>
 #include <lib/LogUtils.h>
+#include <lib/ReaderImpl.h>
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
@@ -423,50 +424,6 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
     client.close();
 }
 
-TEST(ReaderTest, testReferenceLeak) {
-    Client client(serviceUrl);
-
-    std::string topicName = "persistent://public/default/testReferenceLeak";
-
-    Producer producer;
-    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
-
-    for (int i = 0; i < 10; i++) {
-        std::string content = "my-message-" + std::to_string(i);
-        Message msg = MessageBuilder().setContent(content).build();
-        ASSERT_EQ(ResultOk, producer.send(msg));
-    }
-
-    ReaderConfiguration readerConf;
-    Reader reader;
-    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
-
-    ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader);
-    ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader);
-
-    LOG_INFO("1 consumer use count " << consumerPtr.use_count());
-    LOG_INFO("1 reader use count " << readerPtr.use_count());
-
-    for (int i = 0; i < 10; i++) {
-        Message msg;
-        ASSERT_EQ(ResultOk, reader.readNext(msg));
-
-        std::string content = msg.getDataAsString();
-        std::string expected = "my-message-" + std::to_string(i);
-        ASSERT_EQ(expected, content);
-    }
-
-    producer.close();
-    reader.close();
-    // will be released after exit this method.
-    ASSERT_EQ(1, consumerPtr.use_count());
-    ASSERT_EQ(1, readerPtr.use_count());
-    client.close();
-    // will be released after exit this method.
-    ASSERT_EQ(1, consumerPtr.use_count());
-    ASSERT_EQ(1, readerPtr.use_count());
-}
-
 TEST(ReaderTest, testPartitionIndex) {
     Client client(serviceUrl);
 
@@ -519,7 +476,7 @@ TEST(ReaderTest, testSubscriptionNameSetting) {
     Reader reader;
     ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
 
-    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+    ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
 
     reader.close();
     client.close();
@@ -537,7 +494,7 @@ TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
     Reader reader;
     ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
 
-    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+    ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
 
     reader.close();
     client.close();
diff --git a/pulsar-client-cpp/tests/ReaderTest.h b/pulsar-client-cpp/tests/ReaderTest.h
deleted file mode 100644
index fd0387f..0000000
--- a/pulsar-client-cpp/tests/ReaderTest.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "lib/ReaderImpl.h"
-#include <string>
-
-using std::string;
-
-namespace pulsar {
-class ReaderTest {
-   public:
-    static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); }
-    static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) {
-        return reader.impl_->getReaderImplWeakPtr();
-    }
-};
-}  // namespace pulsar

[pulsar] 02/03: [C++] Fix hasMessageAvailable returns wrong value for last message (#13883)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5293e2a3a97be85799336ebf168bca6e090e1c50
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Jan 24 12:06:00 2022 +0800

    [C++] Fix hasMessageAvailable returns wrong value for last message (#13883)
    
    ### Motivation
    
    In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.
    
    ### Modifications
    
    The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.
    
    Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.
    
    To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from https://github.com/apache/pulsar/pull/9652 to compare with `last_message_id` when `startMessageId` is latest.
    
    ### Verifying this change
    
    This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.
    
    (cherry picked from commit e50493ea17dd5f2f9d4527d74cc4f40e12439df2)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc | 4 ++++
 pulsar-client-cpp/lib/ReaderImpl.cc   | 4 +++-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index fa872f5..1f4a6b5 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -165,6 +165,10 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
         return;
     }
 
+    // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
+    // sending the subscribe request.
+    cnx->registerConsumer(consumerId_, shared_from_this());
+
     Lock lockForMessageId(mutexForMessageId_);
     Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
     if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 48f5d58..0a7b321 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -139,7 +139,9 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
 }
 
 void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
-    consumer_->getLastMessageIdAsync(callback);
+    consumer_->getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) {
+        callback(result, response.getLastMessageId());
+    });
 }
 
 ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }

[pulsar] 01/03: [pulsar-client-cpp] Expose getLastMessageId in the Reader API (#11723)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 60fe05c672d9e7502cf96e9df46ea3fb4ed97c90
Author: VadimMolodyh <mo...@gmail.com>
AuthorDate: Tue Aug 24 14:49:17 2021 +0300

    [pulsar-client-cpp] Expose getLastMessageId in the Reader API (#11723)
    
    ### Motivation
    
    The changes are trivial. getLastMessageIdAsync is already implemented in the ConsumerImpl class but it is only used internally for checking if there are any available messages in the topic. It is really helpful to have it exposed in the Reader API e.g., to be able to read all messages currently available in the topic. I.e., to get last message id and then to read all messages till this id. (hasMessageAvailable is not helpful because it potentially might always return 'false' if new me [...]
    
    ### Modifications
    
    Trivial changes of ReaderImpl and Reader classes to expose getLastMessageId.
    
    (cherry picked from commit 640e63b232cf24c186088d2019201836f0c5b5ad)
---
 pulsar-client-cpp/include/pulsar/Reader.h | 12 ++++++++++++
 pulsar-client-cpp/lib/Reader.cc           | 15 +++++++++++++++
 pulsar-client-cpp/lib/ReaderImpl.cc       |  4 ++++
 pulsar-client-cpp/lib/ReaderImpl.h        |  2 ++
 4 files changed, 33 insertions(+)

diff --git a/pulsar-client-cpp/include/pulsar/Reader.h b/pulsar-client-cpp/include/pulsar/Reader.h
index 727b012..04d6fb8 100644
--- a/pulsar-client-cpp/include/pulsar/Reader.h
+++ b/pulsar-client-cpp/include/pulsar/Reader.h
@@ -29,6 +29,7 @@ class PulsarFriend;
 class ReaderImpl;
 
 typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
+typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
 
 /**
  * A Reader can be used to scan through all the messages currently available in a topic.
@@ -137,6 +138,17 @@ class PULSAR_PUBLIC Reader {
      */
     bool isConnected() const;
 
+    /**
+     * Asynchronously get an ID of the last available message or a message ID with -1 as an entryId if the
+     * topic is empty.
+     */
+    void getLastMessageIdAsync(GetLastMessageIdCallback callback);
+
+    /**
+     * Get an ID of the last available message or a message ID with -1 as an entryId if the topic is empty.
+     */
+    Result getLastMessageId(MessageId& messageId);
+
    private:
     typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
     ReaderImplPtr impl_;
diff --git a/pulsar-client-cpp/lib/Reader.cc b/pulsar-client-cpp/lib/Reader.cc
index 3327558..fa48536 100644
--- a/pulsar-client-cpp/lib/Reader.cc
+++ b/pulsar-client-cpp/lib/Reader.cc
@@ -117,4 +117,19 @@ Result Reader::seek(uint64_t timestamp) {
 
 bool Reader::isConnected() const { return impl_ && impl_->isConnected(); }
 
+void Reader::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+    if (!impl_) {
+        callback(ResultConsumerNotInitialized, MessageId());
+        return;
+    }
+    impl_->getLastMessageIdAsync(callback);
+}
+
+Result Reader::getLastMessageId(MessageId& messageId) {
+    Promise<Result, MessageId> promise;
+
+    getLastMessageIdAsync(WaitForCallbackValue<MessageId>(promise));
+    return promise.getFuture().get(messageId);
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index ccde6e7..48f5d58 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -138,6 +138,10 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
     consumer_->seekAsync(timestamp, callback);
 }
 
+void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+    consumer_->getLastMessageIdAsync(callback);
+}
+
 ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
 
 bool ReaderImpl::isConnected() const { return consumer_->isConnected(); }
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 7c7a556..a546ae8 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -60,6 +60,8 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
     void seekAsync(const MessageId& msgId, ResultCallback callback);
     void seekAsync(uint64_t timestamp, ResultCallback callback);
 
+    void getLastMessageIdAsync(GetLastMessageIdCallback callback);
+
     ReaderImplWeakPtr getReaderImplWeakPtr();
 
     bool isConnected() const;