You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by gi...@apache.org on 2020/11/12 01:50:27 UTC

[pulsar] 01/01: [C++] Fix potential crash caused by AckGroupTracker's timer (#8519)

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch pulsarbot/cherry-pick-8519
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 056eb50747d96f3ea8c9bd27e4896e959f339c06
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Nov 12 09:45:32 2020 +0800

    [C++] Fix potential crash caused by AckGroupTracker's timer (#8519)
    
    ### Motivation
    
    The `AckGroupingTrackerEnabled`'s timer callback only captures `this`, which is a weak reference to the `AckGroupingTrackerEnabled ` instance. If the instance went out of the scope and destroyed, `this` would point to an invalid block.
    
    Even if the destructor of `AckGroupingTrackerEnabled` cancels the timer, the callback may not be triggered immediately. There's still a possibility that when the callback is triggered, the error code is 0 but accessing to `this` is invalid. For example, there's a crash caused by the callback in production environment that is hard to reproduce:
    
    ```
    #6 <signal handler called>
    #7 0x00007fb4e67c5cb8 in ?? ()
    #8 0x00007fb604981adb in operator() (ec=..., __closure=0x7fb52b0fb230)
       at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc:148
    #9 operator() (this=0x7fb52b0fb230) at /usr/local/include/boost/asio/detail/bind_handler.hpp:47
    ```
    
    ### Modifications
    
    - Use `std::shared_ptr` instead of `std::unique_ptr` for `AckGroupingTrackerEnabled`, then capture the shared pointer in timer callback's lambda expression to extend the lifetime of `this`.
    - Add `start()` method to `AckGroupingTracker` to avoid `std::bad_weak_ptr` because `shared_from_this()` in a constructor returns a null pointer.
    - Use `std::weak_ptr` to reference `HandlerBase` in case that the handler may be invalid when the timer callback is triggered.
---
 pulsar-client-cpp/lib/AckGroupingTracker.h         |  9 ++-
 pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc | 21 ++++---
 pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h  |  7 ++-
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 41 +++++++------
 pulsar-client-cpp/lib/ConsumerImpl.h               |  2 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 70 ++++++++++++----------
 pulsar-client-cpp/tests/PulsarFriend.h             |  4 ++
 7 files changed, 90 insertions(+), 64 deletions(-)

diff --git a/pulsar-client-cpp/lib/AckGroupingTracker.h b/pulsar-client-cpp/lib/AckGroupingTracker.h
index a5b3d66..f4410e4 100644
--- a/pulsar-client-cpp/lib/AckGroupingTracker.h
+++ b/pulsar-client-cpp/lib/AckGroupingTracker.h
@@ -35,12 +35,17 @@ namespace pulsar {
  * Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers.
  * It can be directly used by consumers for non-persistent topics.
  */
-class AckGroupingTracker {
+class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
    public:
     AckGroupingTracker() = default;
     virtual ~AckGroupingTracker() = default;
 
     /**
+     * Start tracking the ACK requests.
+     */
+    virtual void start() {}
+
+    /**
      * Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
      * discard messages that are being resent after a disconnection and for which the user has
      * already sent an acknowledgement.
@@ -102,7 +107,7 @@ class AckGroupingTracker {
                         const std::set<MessageId>& msgIds);
 };  // class AckGroupingTracker
 
-using AckGroupingTrackerScopedPtr = std::unique_ptr<AckGroupingTracker>;
+using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;
 
 }  // namespace pulsar
 #endif /* LIB_ACKGROUPINGTRACKER_H_ */
diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
index 5c61cca..a651c7a 100644
--- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
@@ -33,11 +33,11 @@ namespace pulsar {
 
 DECLARE_LOG_OBJECT();
 
-AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler,
-                                                     uint64_t consumerId, long ackGroupingTimeMs,
-                                                     long ackGroupingMaxSize)
+AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
+                                                     const HandlerBasePtr& handlerPtr, uint64_t consumerId,
+                                                     long ackGroupingTimeMs, long ackGroupingMaxSize)
     : AckGroupingTracker(),
-      handler_(handler),
+      handlerWeakPtr_(handlerPtr),
       consumerId_(consumerId),
       nextCumulativeAckMsgId_(MessageId::earliest()),
       requireCumulativeAck_(false),
@@ -51,9 +51,10 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, Ha
       mutexTimer_() {
     LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "
                                                         << ackGroupingMaxSize);
-    this->scheduleTimer();
 }
 
+void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }
+
 bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
     {
         // Check if the message ID is already ACKed by a previous (or pending) cumulative request.
@@ -94,7 +95,12 @@ void AckGroupingTrackerEnabled::close() {
 }
 
 void AckGroupingTrackerEnabled::flush() {
-    auto cnx = this->handler_.getCnx().lock();
+    auto handler = handlerWeakPtr_.lock();
+    if (!handler) {
+        LOG_WARN("Reference to the HandlerBase is not valid.");
+        return;
+    }
+    auto cnx = handler->getCnx().lock();
     if (cnx == nullptr) {
         LOG_DEBUG("Connection is not ready, grouping ACK failed.");
         return;
@@ -143,7 +149,8 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
     std::lock_guard<std::mutex> lock(this->mutexTimer_);
     this->timer_ = this->executor_->createDeadlineTimer();
     this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
-    this->timer_->async_wait([this](const boost::system::error_code& ec) -> void {
+    auto self = shared_from_this();
+    this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void {
         if (!ec) {
             this->flush();
             this->scheduleTimer();
diff --git a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
index 9fb871a..c3926aa 100644
--- a/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
@@ -42,14 +42,15 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
     /**
      * Constructing ACK grouping tracker for peresistent topics.
      * @param[in] clientPtr pointer to client object.
-     * @param[in] handler the connection handler.
+     * @param[in] handlerPtr the shared pointer to connection handler.
      * @param[in] consumerId consumer ID that this tracker belongs to.
      * @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
      * @param[in] ackGroupingMaxSize max. number of ACK requests can be grouped.
      */
-    AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler, uint64_t consumerId,
+    AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& handlerPtr, uint64_t consumerId,
                               long ackGroupingTimeMs, long ackGroupingMaxSize);
 
+    void start() override;
     bool isDuplicate(const MessageId& msgId) override;
     void addAcknowledge(const MessageId& msgId) override;
     void addAcknowledgeCumulative(const MessageId& msgId) override;
@@ -62,7 +63,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
     void scheduleTimer();
 
     //! The connection handler.
-    HandlerBase& handler_;
+    HandlerBaseWeakPtr handlerWeakPtr_;
 
     //! ID of the consumer that this tracker belongs to.
     uint64_t consumerId_;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index dedd9b2..e85fbf1 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -62,6 +62,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
       brokerConsumerStats_(),
       consumerStatsBasePtr_(),
       negativeAcksTracker_(client, *this, conf),
+      ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
       msgCrypto_(),
       readCompacted_(conf.isReadCompacted()),
       lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
@@ -102,23 +103,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
     if (conf.isEncryptionEnabled()) {
         msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
     }
-
-    // Initialize ACK grouping tracker.
-    if (TopicName::get(topic)->isPersistent()) {
-        // Persistent topic, ACK requests need to be sent to broker.
-        if (conf.getAckGroupingTimeMs() > 0) {
-            // Grouping ACK is ENABLED because grouping time value is a positive value.
-            this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
-                client, *this, this->consumerId_, conf.getAckGroupingTimeMs(), conf.getAckGroupingMaxSize()));
-        } else {
-            // Grouping ACK is DISABLED because grouping time value is a non-positive value.
-            this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, this->consumerId_));
-        }
-    } else {
-        // Non-persistent topic, ACK requests do NOT need to be sent to broker.
-        LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
-        this->ackGroupingTrackerPtr_.reset(new AckGroupingTracker());
-    }
 }
 
 ConsumerImpl::~ConsumerImpl() {
@@ -143,7 +127,24 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu
 
 const std::string& ConsumerImpl::getTopic() const { return topic_; }
 
-void ConsumerImpl::start() { grabCnx(); }
+void ConsumerImpl::start() {
+    HandlerBase::start();
+
+    // Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
+    // constructor completed.
+    if (TopicName::get(topic_)->isPersistent()) {
+        if (config_.getAckGroupingTimeMs() > 0) {
+            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
+                client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
+                config_.getAckGroupingMaxSize()));
+        } else {
+            ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
+        }
+    } else {
+        LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
+    }
+    ackGroupingTrackerPtr_->start();
+}
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     Lock lock(mutex_);
@@ -871,7 +872,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     state_ = Closing;
 
     // Flush pending grouped ACK requests.
-    this->ackGroupingTrackerPtr_->close();
+    if (ackGroupingTrackerPtr_) {
+        ackGroupingTrackerPtr_->close();
+    }
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 9306a91..e122a4a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -196,7 +196,7 @@ class ConsumerImpl : public ConsumerImplBase,
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
     NegativeAcksTracker negativeAcksTracker_;
-    AckGroupingTrackerScopedPtr ackGroupingTrackerPtr_;
+    AckGroupingTrackerPtr ackGroupingTrackerPtr_;
 
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index acad16a..e3ae2f2 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3513,6 +3513,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerSingleAckBehavior) {
 
     // Send ACK.
     AckGroupingTrackerMock tracker(false);
+    tracker.start();
     for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
         auto connPtr = connWeakPtr.lock();
         ASSERT_NE(connPtr, nullptr);
@@ -3563,6 +3564,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerMultiAckBehavior) {
 
     // Send ACK.
     AckGroupingTrackerMock tracker(false);
+    tracker.start();
     std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
     ASSERT_EQ(restMsgId.size(), numMsg);
     ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, consumerImpl.getConsumerId(), restMsgId));
@@ -3669,9 +3671,10 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerDisabledCumulativeAck) {
 
 class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled {
    public:
-    AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, HandlerBase &handler, uint64_t consumerId,
-                                  long ackGroupingTimeMs, long ackGroupingMaxSize)
-        : AckGroupingTrackerEnabled(clientPtr, handler, consumerId, ackGroupingTimeMs, ackGroupingMaxSize) {}
+    AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const HandlerBasePtr &handlerPtr,
+                                  uint64_t consumerId, long ackGroupingTimeMs, long ackGroupingMaxSize)
+        : AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId, ackGroupingTimeMs,
+                                    ackGroupingMaxSize) {}
     const std::set<MessageId> &getPendingIndividualAcks() { return this->pendingIndividualAcks_; }
     const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; }
     const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; }
@@ -3698,7 +3701,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+    auto consumerImpl = PulsarFriend::getConsumerImplPtr(consumer);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3713,22 +3716,23 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
         recvMsgId.emplace_back(msg.getMessageId());
     }
 
-    AckGroupingTrackerEnabledMock tracker(clientImplPtr, consumerImpl, consumerImpl.getConsumerId(),
-                                          ackGroupingTimeMs, ackGroupingMaxSize);
-    ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
-    ASSERT_EQ(tracker.getAckGroupingTimeMs(), ackGroupingTimeMs);
-    ASSERT_EQ(tracker.getAckGroupingMaxSize(), ackGroupingMaxSize);
+    auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
+        clientImplPtr, consumerImpl, consumerImpl->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+    tracker->start();
+    ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
+    ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
+    ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
     for (auto &msgId : recvMsgId) {
-        ASSERT_FALSE(tracker.isDuplicate(msgId));
-        tracker.addAcknowledge(msgId);
-        ASSERT_TRUE(tracker.isDuplicate(msgId));
+        ASSERT_FALSE(tracker->isDuplicate(msgId));
+        tracker->addAcknowledge(msgId);
+        ASSERT_TRUE(tracker->isDuplicate(msgId));
     }
-    ASSERT_EQ(tracker.getPendingIndividualAcks().size(), recvMsgId.size());
+    ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size());
 
     std::this_thread::sleep_for(std::chrono::seconds(2));
-    ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
+    ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
     for (auto &msgId : recvMsgId) {
-        ASSERT_FALSE(tracker.isDuplicate(msgId));
+        ASSERT_FALSE(tracker->isDuplicate(msgId));
     }
     consumer.close();
 
@@ -3757,7 +3761,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
+    auto consumerImpl0 = PulsarFriend::getConsumerImplPtr(consumer);
 
     // Sending and receiving messages.
     for (auto count = 0; count < numMsg; ++count) {
@@ -3773,32 +3777,33 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
     }
     std::sort(recvMsgId.begin(), recvMsgId.end());
 
-    AckGroupingTrackerEnabledMock tracker0(clientImplPtr, consumerImpl0, consumerImpl0.getConsumerId(),
-                                           ackGroupingTimeMs, ackGroupingMaxSize);
-    ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), MessageId::earliest());
-    ASSERT_FALSE(tracker0.requireCumulativeAck());
+    auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
+        clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+    tracker0->start();
+    ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
+    ASSERT_FALSE(tracker0->requireCumulativeAck());
 
     auto targetMsgId = recvMsgId[numMsg / 2];
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
-        ASSERT_FALSE(tracker0.isDuplicate(recvMsgId[idx]));
+        ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx]));
     }
-    tracker0.addAcknowledgeCumulative(targetMsgId);
+    tracker0->addAcknowledgeCumulative(targetMsgId);
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
-        ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
+        ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
     }
-    ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), targetMsgId);
-    ASSERT_TRUE(tracker0.requireCumulativeAck());
+    ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), targetMsgId);
+    ASSERT_TRUE(tracker0->requireCumulativeAck());
 
     std::this_thread::sleep_for(std::chrono::seconds(2));
-    ASSERT_FALSE(tracker0.requireCumulativeAck());
+    ASSERT_FALSE(tracker0->requireCumulativeAck());
     for (auto idx = 0; idx <= numMsg / 2; ++idx) {
-        ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
+        ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
     }
     consumer.close();
 
     std::this_thread::sleep_for(std::chrono::seconds(1));
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
-    auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
+    auto consumerImpl1 = PulsarFriend::getConsumerImplPtr(consumer);
     std::set<MessageId> restMsgId(recvMsgId.begin() + numMsg / 2 + 1, recvMsgId.end());
     for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
         Message msg;
@@ -3808,10 +3813,11 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
     Message msg;
     auto ret = consumer.receive(msg, 1000);
     ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString();
-    AckGroupingTrackerEnabledMock tracker1(clientImplPtr, consumerImpl1, consumerImpl1.getConsumerId(),
-                                           ackGroupingTimeMs, ackGroupingMaxSize);
-    tracker1.addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
-    tracker1.close();
+    auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
+        clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
+    tracker1->start();
+    tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
+    tracker1->close();
     consumer.close();
 
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index 26a1fe8..87b48d2 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -82,6 +82,10 @@ class PulsarFriend {
         return *consumerImpl;
     }
 
+    static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) {
+        return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
+    }
+
     static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
 
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {