You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 03:49:15 UTC
[pulsar] branch branch-2.9 updated: [fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 3b8ebf879c4 [fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)
3b8ebf879c4 is described below
commit 3b8ebf879c418ef8605531c0764005691045d207
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Sep 15 13:12:43 2022 +0900
[fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)
(cherry picked from commit a98f025a35935a7a27db3f0919aa6f4453e4fb02)
---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 17 ++++--
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 1 +
pulsar-client-cpp/tests/ConsumerTest.cc | 71 ++++++++++++++++++++++++
3 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index c35d84b6db6..c78c12eaec0 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -451,8 +451,8 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
- listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
+ listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(), ResultOk, msg, callback));
} else {
if (messages_.full()) {
lock.unlock();
@@ -469,7 +469,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
Message m;
messages_.pop(m);
-
+ unAckedMessageTrackerPtr_->add(m.getMessageId());
try {
messageListener_(Consumer(shared_from_this()), m);
} catch (const std::exception& e) {
@@ -535,11 +535,20 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
- listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg));
+ listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(), ResultAlreadyClosed, msg, callback));
}
lock.unlock();
}
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
+ const ReceiveCallback& callback) {
+ if (result == ResultOk) {
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ }
+ callback(result, msg);
+}
+
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 95c24f68c5b..8769d59b990 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -128,6 +128,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
+ void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index b1fc11cec8f..e672da33c09 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -475,6 +475,77 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
client.close();
}
+TEST(ConsumerTest, testPartitionedConsumerUnexpectedAckTimeout) {
+ ClientConfiguration clientConfig;
+ clientConfig.setMessageListenerThreads(1);
+ Client client(lookupUrl, clientConfig);
+
+ const std::string partitionedTopic =
+ "testPartitionedConsumerUnexpectedAckTimeout" + std::to_string(time(nullptr));
+ std::string subName = "sub";
+ constexpr int numPartitions = 2;
+ constexpr int numOfMessages = 3;
+ constexpr int unAckedMessagesTimeoutMs = 10000;
+ constexpr int tickDurationInMs = 1000;
+ pulsar::Latch latch(numOfMessages);
+ std::vector<Message> messages;
+ std::mutex mtx;
+
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setConsumerType(ConsumerShared);
+ consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ consumerConfig.setTickDurationInMs(tickDurationInMs);
+ consumerConfig.setMessageListener([&](Consumer cons, const Message& msg) {
+ // acknowledge received messages immediately, so no ack timeout is expected
+ ASSERT_EQ(ResultOk, cons.acknowledge(msg.getMessageId()));
+ ASSERT_EQ(0, msg.getRedeliveryCount());
+
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ messages.emplace_back(msg);
+ }
+
+ if (latch.getCount() > 0) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(unAckedMessagesTimeoutMs + tickDurationInMs * 2));
+ latch.countdown();
+ }
+ });
+ ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, consumerConfig, consumer));
+
+ // send messages
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setBlockIfQueueFull(true);
+ producerConfig.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
+ std::string prefix = "message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+ producer.close();
+
+ bool wasUnblocked = latch.wait(
+ std::chrono::milliseconds((unAckedMessagesTimeoutMs + tickDurationInMs * 2) * numOfMessages + 5000));
+ ASSERT_TRUE(wasUnblocked);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(5000));
+ // messages are expected not to be redelivered
+ ASSERT_EQ(numOfMessages, messages.size());
+
+ consumer.close();
+ client.close();
+}
+
TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
Client client(lookupUrl);
const std::string nonPartitionedTopic =