You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/05 03:19:35 UTC

[pulsar] branch branch-2.9 updated: Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)"

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new c2b685bdb8c Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)"
c2b685bdb8c is described below

commit c2b685bdb8c2d99f66aac06aae2d5e328362980a
Author: mattison chao <ma...@gmail.com>
AuthorDate: Fri Aug 5 11:12:35 2022 +0800

    Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)"
    
    This reverts commit 2e761b8a0ddfc6a0097f85f2f326f41aefb43a44.
---
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  4 ---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  4 ---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |  7 ----
 pulsar-client-cpp/tests/ConsumerTest.cc          | 43 ------------------------
 4 files changed, 58 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 598d2c1ae19..84583d521cc 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -721,10 +721,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
         messageProcessed(msg);
         return ResultOk;
     } else {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 361a863d247..0ae86d5879a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -493,10 +493,6 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
-        lock.lock();
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 48f1e4a398b..e43b5090e43 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -106,10 +106,6 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
-        lock.lock();
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
         return ResultTimeout;
     }
 }
@@ -432,9 +428,6 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
 
 void PartitionedConsumerImpl::failPendingReceiveCallback() {
     Message msg;
-
-    messages_.close();
-
     Lock lock(pendingReceiveMutex_);
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index adb94a4ad13..b61c15a8866 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -720,47 +720,4 @@ TEST(ConsumerTest, testIsConnected) {
     ASSERT_FALSE(consumer.isConnected());
 }
 
-TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
-    Client client(lookupUrl);
-    const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + std::to_string(time(nullptr));
-    constexpr int numPartitions = 2;
-
-    int res =
-        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
-                       std::to_string(numPartitions));
-    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-
-    Consumer consumer;
-    ConsumerConfiguration consumerConfig;
-    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", consumerConfig, consumer));
-
-    // send messages
-    ProducerConfiguration producerConfig;
-    Producer producer;
-    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
-    Message msg = MessageBuilder().setContent("message").build();
-    ASSERT_EQ(ResultOk, producer.send(msg));
-
-    producer.close();
-
-    // receive message on another thread
-    pulsar::Latch latch(1);
-    auto thread = std::thread([&]() {
-        Message msg;
-        ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
-        consumer.acknowledge(msg.getMessageId());
-        ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
-        latch.countdown();
-    });
-
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-
-    consumer.close();
-
-    bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));
-
-    ASSERT_TRUE(wasUnblocked);
-    thread.join();
-}
-
 }  // namespace pulsar