You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/02/09 13:58:42 UTC

[pulsar-client-cpp] branch main updated: NegativeAcksTracker need close when consumer closed. (#188)

This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new c3e3e8a  NegativeAcksTracker need close when consumer closed. (#188)
c3e3e8a is described below

commit c3e3e8add445b8afe1a6cf5c396244b54e8e3218
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Thu Feb 9 21:58:36 2023 +0800

    NegativeAcksTracker need close when consumer closed. (#188)
---
 lib/ConsumerImpl.cc        |  2 ++
 lib/ConsumerImpl.h         |  1 +
 lib/NegativeAcksTracker.cc |  2 ++
 lib/NegativeAcksTracker.h  |  4 ++++
 tests/ConsumerTest.cc      | 30 ++++++++++++++++++++++++++++++
 5 files changed, 39 insertions(+)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 56fcbe9..1703d2a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1186,6 +1186,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
     if (ackGroupingTrackerPtr_) {
         ackGroupingTrackerPtr_->close();
     }
+    negativeAcksTracker_.close();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
@@ -1222,6 +1223,7 @@ void ConsumerImpl::shutdown() {
     if (client) {
         client->cleanupConsumer(this);
     }
+    negativeAcksTracker_.close();
     cancelTimers();
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
     failPendingReceiveCallback();
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index b9292c0..66e0cd0 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -332,6 +332,7 @@ class ConsumerImpl : public ConsumerImplBase {
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
+    FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
     FRIEND_TEST(DeadLetterQueueTest, testAutoSetDLQTopicName);
 };
 
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 9dcca20..7807808 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -105,6 +105,8 @@ void NegativeAcksTracker::close() {
         boost::system::error_code ec;
         timer_->cancel(ec);
     }
+    timer_ = nullptr;
+    nackedMessages_.clear();
 }
 
 void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index c5a945b..f8b334b 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -28,6 +28,8 @@
 #include <memory>
 #include <mutex>
 
+#include "TestUtil.h"
+
 namespace pulsar {
 
 class ConsumerImpl;
@@ -66,6 +68,8 @@ class NegativeAcksTracker {
     ExecutorServicePtr executor_;
     DeadlineTimerPtr timer_;
     bool enabledForTesting_;  // to be able to test deterministically
+
+    FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
 };
 
 }  // namespace pulsar
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index ac971b3..cf4d8f8 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1005,6 +1005,36 @@ TEST_P(ConsumerSeekTest, testSeekForMessageId) {
     producer.close();
 }
 
+TEST(ConsumerTest, testNegativeAcksTrackerClose) {
+    Client client(lookupUrl);
+    auto topicName = "testNegativeAcksTrackerClose";
+
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+    Consumer consumer;
+    client.subscribe(topicName, "test-sub", consumerConfig, consumer);
+
+    Producer producer;
+    client.createProducer(topicName, producer);
+
+    for (int i = 0; i < 10; ++i) {
+        producer.send(MessageBuilder().setContent(std::to_string(i)).build());
+    }
+
+    Message msg;
+    PulsarFriend::setNegativeAckEnabled(consumer, false);
+    for (int i = 0; i < 10; ++i) {
+        consumer.receive(msg);
+        consumer.negativeAcknowledge(msg);
+    }
+
+    consumer.close();
+    auto consumerImplPtr = PulsarFriend::getConsumerImplPtr(consumer);
+    ASSERT_TRUE(consumerImplPtr->negativeAcksTracker_.nackedMessages_.empty());
+
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
 
 }  // namespace pulsar