You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gi...@apache.org on 2020/11/12 01:50:26 UTC

[pulsar] branch pulsarbot/cherry-pick-8519 created (now 056eb50)

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch pulsarbot/cherry-pick-8519
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


      at 056eb50  [C++] Fix potential crash caused by AckGroupTracker's timer (#8519)

This branch includes the following new commits:

     new 056eb50  [C++] Fix potential crash caused by AckGroupTracker's timer (#8519)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[pulsar] 01/01: [C++] Fix potential crash caused by AckGroupTracker's timer (#8519)

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch pulsarbot/cherry-pick-8519
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 056eb50747d96f3ea8c9bd27e4896e959f339c06
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Nov 12 09:45:32 2020 +0800

    [C++] Fix potential crash caused by AckGroupTracker's timer (#8519)
    
    ### Motivation
    
    The `AckGroupingTrackerEnabled`'s timer callback only captures `this`, which is a weak reference to the `AckGroupingTrackerEnabled ` instance. If the instance went out of the scope and destroyed, `this` would point to an invalid block.
    
    Even if the destructor of `AckGroupingTrackerEnabled` cancels the timer, the callback may not be triggered immediately. There's still a possibility that when the callback is triggered, the error code is 0 but accessing to `this` is invalid. For example, there's a crash caused by the callback in production environment that is hard to reproduce:
    
    ```
    #6 <signal handler called>
    #7 0x00007fb4e67c5cb8 in ?? ()
    #8 0x00007fb604981adb in operator() (ec=..., __closure=0x7fb52b0fb230)
       at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc:148
    #9 operator() (this=0x7fb52b0fb230) at /usr/local/include/boost/asio/detail/bind_handler.hpp:47
    ```
    
    ### Modifications
    
    - Use `std::shared_ptr` instead of `std::unique_ptr` for `AckGroupingTrackerEnabled`, then capture the shared pointer in timer callback's lambda expression to extend the lifetime of `this`.
    - Add `start()` method to `AckGroupingTracker` to avoid `std::bad_weak_ptr` because `shared_from_this()` in a constructor returns a null pointer.
    - Use `std::weak_ptr` to reference `HandlerBase` in case that the handler may be invalid when the timer callback is triggered.
---
 pulsar-client-cpp/lib/AckGroupingTracker.h         |  9 ++-
 pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc | 21 ++++---
 pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h  |  7 ++-
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 41 +++++++------
 pulsar-client-cpp/lib/ConsumerImpl.h               |  2 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 70 ++++++++++++----------
 pulsar-client-cpp/tests/PulsarFriend.h             |  4 ++
 7 files changed, 90 insertions(+), 64 deletions(-)

diff --git a/pulsar-client-cpp/lib/AckGroupingTracker.h b/pulsar-client-cpp/lib/AckGroupingTracker.h
index a5b3d66..f4410e4 100644
--- a/pulsar-client-cpp/lib/AckGroupingTracker.h
+++ b/pulsar-client-cpp/lib/AckGroupingTracker.h
@@ -35,12 +35,17 @@ namespace pulsar {
  * Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers.
  * It can be directly used by consumers for non-persistent topics.
  */
-class AckGroupingTracker {
+class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
    public:
     AckGroupingTracker() = default;
     virtual ~AckGroupingTracker() = default;
 
     /**
+     * Start tracking the ACK requests.
+     */
+    virtual void start() {}
+
+    /**
      * Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
      * discard messages that are being resent after a disconnection and for which the user has
      * already sent an acknowledgement.
@@ -102,7 +107,7 @@ class AckGroupingTracker {
                         const std::set<MessageId>& msgIds);
 };  // class AckGroupingTracker
 
-using AckGroupingTrackerScopedPtr = std::unique_ptr<AckGroupingTracker>;
+using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;
 
 }  // namespace pulsar
 #endif /* LIB_ACKGROUPINGTRACKER_H_ */
diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
index 5c61cca..a651c7a 100644
--- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
@@ -33,11 +33,11 @@ namespace pulsar {
 
 DECLARE_LOG_OBJECT();
 
-AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler,
-                                                     uint64_t consumerId, long ackGroupingTimeMs,
-                                                     long ackGroupingMaxSize)
+AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
+                                                     const HandlerBasePtr& handlerPtr, uint64_t consumerId,
+                                                     long ackGroupingTimeMs, long ackGroupingMaxSize)
     : AckGroupingTracker(),
-      handler_(handler),
+      handlerWeakPtr_(handlerPtr),
       consumerId_(consumerId),
       nextCumulativeAckMsgId_(MessageId::earliest()),
       requireCumulativeAck_(false),
@@ -51,9 +51,10 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, Ha
       mutexTimer_() {
     LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "
                                                         << ackGroupingMaxSize);
-    this->scheduleTimer();
 }
 
+void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }
+
 bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
     {
         // Check if the message ID is already ACKed by a previous (or pending) cumulative request.
@@ -94,7 +95,12 @@ void AckGroupingTrackerEnabled::close() {
 }
 
 void AckGroupingTrackerEnabled::flush() {
-    auto cnx = this->handler_.getCnx().lock();
+    auto handler = handlerWeakPtr_.lock();
+    if (!handler) {
+        LOG_WARN("Reference to the HandlerBase is not valid.");
+        return;
+    }
+    auto cnx = handler->getCnx().lock();
     if (cnx == nullptr) {
         LOG_DEBUG("Connection is not ready, grouping ACK failed.");
         return;
@@ -143,7 +149,8 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     this->timer_ = this->executor_->createDeadlineTimer();
     this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
-    this->timer_->async_wait([this](const boost::system::error_code& ec) -> void {
+    auto self = shared_from_this();
+    this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void {
         if (!ec) {
             this->flush();
             this->scheduleTimer();
diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
index 9fb871a..c3926aa 100644
--- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
@@ -42,14 +42,15 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
     /**
      * Constructing ACK grouping tracker for peresistent topics.
      * @param[in] clientPtr pointer to client object.
-     * @param[in] handler the connection handler.
+     * @param[in] handlerPtr the shared pointer to connection handler.
      * @param[in] consumerId consumer ID that this tracker belongs to.
      * @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
      * @param[in] ackGroupingMaxSize max. number of ACK requests can be grouped.
      */
-    AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler, uint64_t consumerId,
+    AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& handlerPtr, uint64_t consumerId,
                               long ackGroupingTimeMs, long ackGroupingMaxSize);
 
+    void start() override;
     bool isDuplicate(const MessageId& msgId) override;
     void addAcknowledge(const MessageId& msgId) override;
     void addAcknowledgeCumulative(const MessageId& msgId) override;
@@ -62,7 +63,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
     void scheduleTimer();
 
     //! The connection handler.
-    HandlerBase& handler_;
+    HandlerBaseWeakPtr handlerWeakPtr_;
 
     //! ID of the consumer that this tracker belongs to.
     uint64_t consumerId_;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index dedd9b2..e85fbf1 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -62,6 +62,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
       brokerConsumerStats_(),
       consumerStatsBasePtr_(),
       negativeAcksTracker_(client, *this, conf),
+      ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
       msgCrypto_(),
       readCompacted_(conf.isReadCompacted()),
       lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
@@ -102,23 +103,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
     if (conf.isEncryptionEnabled()) {
         msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
     }
-
-    // Initialize ACK grouping tracker.
-    if (TopicName::get(topic)->isPersistent()) {
-        // Persistent topic, ACK requests need to be sent to broker.
-        if (conf.getAckGroupingTimeMs() > 0) {
-            // Grouping ACK is ENABLED because grouping time value is a positive value.
-            this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
-                client, *this, this->consumerId_, conf.getAckGroupingTimeMs(), conf.getAckGroupingMaxSize()));
-        } else {
-            // Grouping ACK is DISABLED because grouping time value is a non-positive value.
-            this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, this->consumerId_));
-        }
-    } else {
-        // Non-persistent topic, ACK requests do NOT need to be sent to broker.
-        LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
-        this->ackGroupingTrackerPtr_.reset(new AckGroupingTracker());
-    }
 }
 
 ConsumerImpl::~ConsumerImpl() {
@@ -143,7 +127,24 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu
 
 const std::string& ConsumerImpl::getTopic() const { return topic_; }
 
-void ConsumerImpl::start() { grabCnx(); }
+void ConsumerImpl::start() {
+    HandlerBase::start();
+
+    // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
+    // constructor completed.
+    if (TopicName::get(topic_)->isPersistent()) {
+        if (config_.getAckGroupingTimeMs() > 0) {
+            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
+                client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
+                config_.getAckGroupingMaxSize()));
+        } else {
+            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
+        }
+    } else {
+        LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
+    }
+    ackGroupingTrackerPtr_->start();
+}
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     Lock lock(mutex_);
@@ -871,7 +872,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     state_ = Closing;
 
     // Flush pending grouped ACK requests.
-    this->ackGroupingTrackerPtr_->close();
+    if (ackGroupingTrackerPtr_) {
+        ackGroupingTrackerPtr_->close();
+    }
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 9306a91..e122a4a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -196,7 +196,7 @@ class ConsumerImpl : public ConsumerImplBase,
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
     NegativeAcksTracker negativeAcksTracker_;
-    AckGroupingTrackerScopedPtr ackGroupingTrackerPtr_;
+    AckGroupingTrackerPtr ackGroupingTrackerPtr_;
 
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index acad16a..e3ae2f2 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3513,6 +3513,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerSingleAckBehavior) {
 
     // Send ACK.
     AckGroupingTrackerMock tracker(false);
+    tracker.start();
     for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
         auto connPtr = connWeakPtr.lock();
         ASSERT_NE(connPtr, nullptr);
@@ -3563,6 +3564,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerMultiAckBehavior) {
 
     // Send ACK.
     AckGroupingTrackerMock tracker(false);
+    tracker.start();
     std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
     ASSERT_EQ(restMsgId.size(), numMsg);
     ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, consumerImpl.getConsumerId(), restMsgId));
@@ -3669,9 +3671,10 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerDisabledCumulativeAck) {
 
 class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled {
    public:
-    AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, HandlerBase &handler, uint64_t consumerId,
-                                  long ackGroupingTimeMs, long ackGroupingMaxSize)
-        : AckGroupingTrackerEnabled(clientPtr, handler, consumerId, ackGroupingTimeMs, ackGroupingMaxSize) {}
+    AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const HandlerBasePtr &handlerPtr,
+                                  uint64_t consumerId, long ackGroupingTimeMs, long ackGroupingMaxSize)
+        : AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId, ackGroupingTimeMs,
+                                    ackGroupingMaxSize) {}
     const std::set<MessageId> &getPendingIndividualAcks() { return this->pendingIndividualAcks_; }
     const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; }
     const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; }
@@ -3698,7 +3701,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    auto consumerImpl = PulsarFriend::getConsumerImplPtr(consumer);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3713,22 +3716,23 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
         recvMsgId.emplace_back(msg.getMessageId());
     }
 
-    AckGroupingTrackerEnabledMock tracker(clientImplPtr, consumerImpl, consumerImpl.getConsumerId(),
-                                          ackGroupingTimeMs, ackGroupingMaxSize);
-    ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
-    ASSERT_EQ(tracker.getAckGroupingTimeMs(), ackGroupingTimeMs);
-    ASSERT_EQ(tracker.getAckGroupingMaxSize(), ackGroupingMaxSize);
+    auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
+        clientImplPtr, consumerImpl, consumerImpl->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+    tracker->start();
+    ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
+    ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
+    ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
     for (auto &msgId : recvMsgId) {
-        ASSERT_FALSE(tracker.isDuplicate(msgId));
-        tracker.addAcknowledge(msgId);
-        ASSERT_TRUE(tracker.isDuplicate(msgId));
+        ASSERT_FALSE(tracker->isDuplicate(msgId));
+        tracker->addAcknowledge(msgId);
+        ASSERT_TRUE(tracker->isDuplicate(msgId));
     }
-    ASSERT_EQ(tracker.getPendingIndividualAcks().size(), recvMsgId.size());
+    ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size());
 
     std::this_thread::sleep_for(std::chrono::seconds(2));
-    ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
+    ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
     for (auto &msgId : recvMsgId) {
-        ASSERT_FALSE(tracker.isDuplicate(msgId));
+        ASSERT_FALSE(tracker->isDuplicate(msgId));
     }
     consumer.close();
 
@@ -3757,7 +3761,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
+    auto consumerImpl0 = PulsarFriend::getConsumerImplPtr(consumer);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3773,32 +3777,33 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
     }
     std::sort(recvMsgId.begin(), recvMsgId.end());
 
-    AckGroupingTrackerEnabledMock tracker0(clientImplPtr, consumerImpl0, consumerImpl0.getConsumerId(),
-                                           ackGroupingTimeMs, ackGroupingMaxSize);
-    ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), MessageId::earliest());
-    ASSERT_FALSE(tracker0.requireCumulativeAck());
+    auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
+        clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+    tracker0->start();
+    ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
+    ASSERT_FALSE(tracker0->requireCumulativeAck());
 
     auto targetMsgId = recvMsgId[numMsg / 2];
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
-        ASSERT_FALSE(tracker0.isDuplicate(recvMsgId[idx]));
+        ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx]));
     }
-    tracker0.addAcknowledgeCumulative(targetMsgId);
+    tracker0->addAcknowledgeCumulative(targetMsgId);
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
-        ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
+        ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
     }
-    ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), targetMsgId);
-    ASSERT_TRUE(tracker0.requireCumulativeAck());
+    ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), targetMsgId);
+    ASSERT_TRUE(tracker0->requireCumulativeAck());
 
     std::this_thread::sleep_for(std::chrono::seconds(2));
-    ASSERT_FALSE(tracker0.requireCumulativeAck());
+    ASSERT_FALSE(tracker0->requireCumulativeAck());
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
-        ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
+        ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
     }
     consumer.close();
 
     std::this_thread::sleep_for(std::chrono::seconds(1));
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
+    auto consumerImpl1 = PulsarFriend::getConsumerImplPtr(consumer);
     std::set<MessageId> restMsgId(recvMsgId.begin() + numMsg / 2 + 1, recvMsgId.end());
     for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
         Message msg;
@@ -3808,10 +3813,11 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
     Message msg;
     auto ret = consumer.receive(msg, 1000);
     ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString();
-    AckGroupingTrackerEnabledMock tracker1(clientImplPtr, consumerImpl1, consumerImpl1.getConsumerId(),
-                                           ackGroupingTimeMs, ackGroupingMaxSize);
-    tracker1.addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
-    tracker1.close();
+    auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
+        clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+    tracker1->start();
+    tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
+    tracker1->close();
     consumer.close();
 
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index 26a1fe8..87b48d2 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -82,6 +82,10 @@ class PulsarFriend {
         return *consumerImpl;
     }
 
+    static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) {
+        return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
+    }
+
     static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
 
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {