You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/04 10:04:59 UTC

[pulsar] 01/03: [fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2e761b8a0ddfc6a0097f85f2f326f41aefb43a44
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Aug 3 09:56:21 2022 +0800

    [fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)
    
    (cherry picked from commit 7c8cd7bdefbe277af07768b5d2fdb01809bf9404)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  4 +++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  4 +++
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |  7 ++++
 pulsar-client-cpp/tests/ConsumerTest.cc          | 43 ++++++++++++++++++++++++
 4 files changed, 58 insertions(+)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 84583d521cc..598d2c1ae19 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -721,6 +721,10 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
         messageProcessed(msg);
         return ResultOk;
     } else {
+        Lock lock(mutex_);
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0ae86d5879a..361a863d247 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -493,6 +493,10 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
+        lock.lock();
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e43b5090e43..48f1e4a398b 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -106,6 +106,10 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
+        lock.lock();
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
         return ResultTimeout;
     }
 }
@@ -428,6 +432,9 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
 
 void PartitionedConsumerImpl::failPendingReceiveCallback() {
     Message msg;
+
+    messages_.close();
+
     Lock lock(pendingReceiveMutex_);
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index b61c15a8866..adb94a4ad13 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -720,4 +720,47 @@ TEST(ConsumerTest, testIsConnected) {
     ASSERT_FALSE(consumer.isConnected());
 }
 
+TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
+    Client client(lookupUrl);
+    const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + std::to_string(time(nullptr));
+    constexpr int numPartitions = 2;
+
+    int res =
+        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", consumerConfig, consumer));
+
+    // send messages
+    ProducerConfiguration producerConfig;
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
+    Message msg = MessageBuilder().setContent("message").build();
+    ASSERT_EQ(ResultOk, producer.send(msg));
+
+    producer.close();
+
+    // receive message on another thread
+    pulsar::Latch latch(1);
+    auto thread = std::thread([&]() {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
+        consumer.acknowledge(msg.getMessageId());
+        ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
+        latch.countdown();
+    });
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    consumer.close();
+
+    bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));
+
+    ASSERT_TRUE(wasUnblocked);
+    thread.join();
+}
+
 }  // namespace pulsar