You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/03 01:56:27 UTC

[pulsar] branch master updated: [fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c8cd7bdefb [fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)
7c8cd7bdefb is described below

commit 7c8cd7bdefbe277af07768b5d2fdb01809bf9404
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Aug 3 09:56:21 2022 +0800

    [fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  4 +++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  4 +++
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |  7 ++++
 pulsar-client-cpp/tests/ConsumerTest.cc          | 43 ++++++++++++++++++++++++
 4 files changed, 58 insertions(+)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index a9ed815ac2a..0cd5d5810b9 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -824,6 +824,10 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
         messageProcessed(msg);
         return ResultOk;
     } else {
+        Lock lock(mutex_);
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0ad9d606810..39b94e454dc 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -493,6 +493,10 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
+        lock.lock();
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 23288e2f811..c7985e7434a 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -106,6 +106,10 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
+        lock.lock();
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
         return ResultTimeout;
     }
 }
@@ -427,6 +431,9 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
 
 void PartitionedConsumerImpl::failPendingReceiveCallback() {
     Message msg;
+
+    messages_.close();
+
     Lock lock(pendingReceiveMutex_);
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index b61c15a8866..adb94a4ad13 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -720,4 +720,47 @@ TEST(ConsumerTest, testIsConnected) {
     ASSERT_FALSE(consumer.isConnected());
 }
 
+TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
+    Client client(lookupUrl);
+    const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + std::to_string(time(nullptr));
+    constexpr int numPartitions = 2;
+
+    int res =
+        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", consumerConfig, consumer));
+
+    // send messages
+    ProducerConfiguration producerConfig;
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
+    Message msg = MessageBuilder().setContent("message").build();
+    ASSERT_EQ(ResultOk, producer.send(msg));
+
+    producer.close();
+
+    // receive message on another thread
+    pulsar::Latch latch(1);
+    auto thread = std::thread([&]() {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
+        consumer.acknowledge(msg.getMessageId());
+        ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
+        latch.countdown();
+    });
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    consumer.close();
+
+    bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));
+
+    ASSERT_TRUE(wasUnblocked);
+    thread.join();
+}
+
 }  // namespace pulsar