You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/04 10:04:59 UTC
[pulsar] 01/03: [fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2e761b8a0ddfc6a0097f85f2f326f41aefb43a44
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Aug 3 09:56:21 2022 +0800
[fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)
(cherry picked from commit 7c8cd7bdefbe277af07768b5d2fdb01809bf9404)
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 4 +++
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 4 +++
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 7 ++++
pulsar-client-cpp/tests/ConsumerTest.cc | 43 ++++++++++++++++++++++++
4 files changed, 58 insertions(+)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 84583d521cc..598d2c1ae19 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -721,6 +721,10 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
messageProcessed(msg);
return ResultOk;
} else {
+ Lock lock(mutex_);
+ if (state_ != Ready) {
+ return ResultAlreadyClosed;
+ }
return ResultTimeout;
}
}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0ae86d5879a..361a863d247 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -493,6 +493,10 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
+ lock.lock();
+ if (state_ != Ready) {
+ return ResultAlreadyClosed;
+ }
return ResultTimeout;
}
}
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e43b5090e43..48f1e4a398b 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -106,6 +106,10 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
+ lock.lock();
+ if (state_ != Ready) {
+ return ResultAlreadyClosed;
+ }
return ResultTimeout;
}
}
@@ -428,6 +432,9 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
void PartitionedConsumerImpl::failPendingReceiveCallback() {
Message msg;
+
+ messages_.close();
+
Lock lock(pendingReceiveMutex_);
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index b61c15a8866..adb94a4ad13 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -720,4 +720,47 @@ TEST(ConsumerTest, testIsConnected) {
ASSERT_FALSE(consumer.isConnected());
}
+TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
+ Client client(lookupUrl);
+ const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + std::to_string(time(nullptr));
+ constexpr int numPartitions = 2;
+
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", consumerConfig, consumer));
+
+ // send messages
+ ProducerConfiguration producerConfig;
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
+ Message msg = MessageBuilder().setContent("message").build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+
+ producer.close();
+
+ // receive message on another thread
+ pulsar::Latch latch(1);
+ auto thread = std::thread([&]() {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
+ consumer.acknowledge(msg.getMessageId());
+ ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
+ latch.countdown();
+ });
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ consumer.close();
+
+ bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));
+
+ ASSERT_TRUE(wasUnblocked);
+ thread.join();
+}
+
} // namespace pulsar