You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2023/02/09 13:58:42 UTC
[pulsar-client-cpp] branch main updated: NegativeAcksTracker need close when consumer closed. (#188)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new c3e3e8a NegativeAcksTracker need close when consumer closed. (#188)
c3e3e8a is described below
commit c3e3e8add445b8afe1a6cf5c396244b54e8e3218
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Thu Feb 9 21:58:36 2023 +0800
NegativeAcksTracker need close when consumer closed. (#188)
---
lib/ConsumerImpl.cc | 2 ++
lib/ConsumerImpl.h | 1 +
lib/NegativeAcksTracker.cc | 2 ++
lib/NegativeAcksTracker.h | 4 ++++
tests/ConsumerTest.cc | 30 ++++++++++++++++++++++++++++++
5 files changed, 39 insertions(+)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 56fcbe9..1703d2a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1186,6 +1186,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
if (ackGroupingTrackerPtr_) {
ackGroupingTrackerPtr_->close();
}
+ negativeAcksTracker_.close();
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
@@ -1222,6 +1223,7 @@ void ConsumerImpl::shutdown() {
if (client) {
client->cleanupConsumer(this);
}
+ negativeAcksTracker_.close();
cancelTimers();
consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
failPendingReceiveCallback();
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index b9292c0..66e0cd0 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -332,6 +332,7 @@ class ConsumerImpl : public ConsumerImplBase {
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
+ FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
FRIEND_TEST(DeadLetterQueueTest, testAutoSetDLQTopicName);
};
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 9dcca20..7807808 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -105,6 +105,8 @@ void NegativeAcksTracker::close() {
boost::system::error_code ec;
timer_->cancel(ec);
}
+ timer_ = nullptr;
+ nackedMessages_.clear();
}
void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index c5a945b..f8b334b 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -28,6 +28,8 @@
#include <memory>
#include <mutex>
+#include "TestUtil.h"
+
namespace pulsar {
class ConsumerImpl;
@@ -66,6 +68,8 @@ class NegativeAcksTracker {
ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
bool enabledForTesting_; // to be able to test deterministically
+
+ FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
};
} // namespace pulsar
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index ac971b3..cf4d8f8 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1005,6 +1005,36 @@ TEST_P(ConsumerSeekTest, testSeekForMessageId) {
producer.close();
}
+TEST(ConsumerTest, testNegativeAcksTrackerClose) {
+ Client client(lookupUrl);
+ auto topicName = "testNegativeAcksTrackerClose";
+
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setNegativeAckRedeliveryDelayMs(100);
+ Consumer consumer;
+ client.subscribe(topicName, "test-sub", consumerConfig, consumer);
+
+ Producer producer;
+ client.createProducer(topicName, producer);
+
+ for (int i = 0; i < 10; ++i) {
+ producer.send(MessageBuilder().setContent(std::to_string(i)).build());
+ }
+
+ Message msg;
+ PulsarFriend::setNegativeAckEnabled(consumer, false);
+ for (int i = 0; i < 10; ++i) {
+ consumer.receive(msg);
+ consumer.negativeAcknowledge(msg);
+ }
+
+ consumer.close();
+ auto consumerImplPtr = PulsarFriend::getConsumerImplPtr(consumer);
+ ASSERT_TRUE(consumerImplPtr->negativeAcksTracker_.nackedMessages_.empty());
+
+ client.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
} // namespace pulsar