You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/05 03:19:35 UTC
[pulsar] branch branch-2.9 updated: Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)"
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new c2b685bdb8c Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)"
c2b685bdb8c is described below
commit c2b685bdb8c2d99f66aac06aae2d5e328362980a
Author: mattison chao <ma...@gmail.com>
AuthorDate: Fri Aug 5 11:12:35 2022 +0800
Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887)"
This reverts commit 2e761b8a0ddfc6a0097f85f2f326f41aefb43a44.
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 4 ---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 4 ---
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 7 ----
pulsar-client-cpp/tests/ConsumerTest.cc | 43 ------------------------
4 files changed, 58 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 598d2c1ae19..84583d521cc 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -721,10 +721,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
messageProcessed(msg);
return ResultOk;
} else {
- Lock lock(mutex_);
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
return ResultTimeout;
}
}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 361a863d247..0ae86d5879a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -493,10 +493,6 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
- lock.lock();
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
return ResultTimeout;
}
}
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 48f1e4a398b..e43b5090e43 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -106,10 +106,6 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
- lock.lock();
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
return ResultTimeout;
}
}
@@ -432,9 +428,6 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
void PartitionedConsumerImpl::failPendingReceiveCallback() {
Message msg;
-
- messages_.close();
-
Lock lock(pendingReceiveMutex_);
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index adb94a4ad13..b61c15a8866 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -720,47 +720,4 @@ TEST(ConsumerTest, testIsConnected) {
ASSERT_FALSE(consumer.isConnected());
}
-TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
- Client client(lookupUrl);
- const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + std::to_string(time(nullptr));
- constexpr int numPartitions = 2;
-
- int res =
- makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
- std::to_string(numPartitions));
- ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-
- Consumer consumer;
- ConsumerConfiguration consumerConfig;
- ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", consumerConfig, consumer));
-
- // send messages
- ProducerConfiguration producerConfig;
- Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
- Message msg = MessageBuilder().setContent("message").build();
- ASSERT_EQ(ResultOk, producer.send(msg));
-
- producer.close();
-
- // receive message on another thread
- pulsar::Latch latch(1);
- auto thread = std::thread([&]() {
- Message msg;
- ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
- consumer.acknowledge(msg.getMessageId());
- ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
- latch.countdown();
- });
-
- std::this_thread::sleep_for(std::chrono::seconds(1));
-
- consumer.close();
-
- bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));
-
- ASSERT_TRUE(wasUnblocked);
- thread.join();
-}
-
} // namespace pulsar