You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/07 03:49:15 UTC

[pulsar] branch branch-2.9 updated: [fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 3b8ebf879c4 [fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)
3b8ebf879c4 is described below

commit 3b8ebf879c418ef8605531c0764005691045d207
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Thu Sep 15 13:12:43 2022 +0900

    [fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)
    
    (cherry picked from commit a98f025a35935a7a27db3f0919aa6f4453e4fb02)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 17 ++++--
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h  |  1 +
 pulsar-client-cpp/tests/ConsumerTest.cc          | 71 ++++++++++++++++++++++++
 3 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index c35d84b6db6..c78c12eaec0 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -451,8 +451,8 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         lock.unlock();
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
-        listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
+        listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+                                              shared_from_this(), ResultOk, msg, callback));
     } else {
         if (messages_.full()) {
             lock.unlock();
@@ -469,7 +469,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
 void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
     Message m;
     messages_.pop(m);
-
+    unAckedMessageTrackerPtr_->add(m.getMessageId());
     try {
         messageListener_(Consumer(shared_from_this()), m);
     } catch (const std::exception& e) {
@@ -535,11 +535,20 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
-        listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg));
+        listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
 
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
+                                                            const ReceiveCallback& callback) {
+    if (result == ResultOk) {
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    }
+    callback(result, msg);
+}
+
 void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
     if (state_ != Ready) {
         callback(ResultAlreadyClosed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 95c24f68c5b..8769d59b990 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -128,6 +128,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void internalListener(Consumer consumer);
     void receiveMessages();
     void failPendingReceiveCallback();
+    void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
 
     void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
                                   std::shared_ptr<std::atomic<int>> topicsNeedCreate);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index b1fc11cec8f..e672da33c09 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -475,6 +475,77 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
     client.close();
 }
 
+TEST(ConsumerTest, testPartitionedConsumerUnexpectedAckTimeout) {
+    ClientConfiguration clientConfig;
+    clientConfig.setMessageListenerThreads(1);
+    Client client(lookupUrl, clientConfig);
+
+    const std::string partitionedTopic =
+        "testPartitionedConsumerUnexpectedAckTimeout" + std::to_string(time(nullptr));
+    std::string subName = "sub";
+    constexpr int numPartitions = 2;
+    constexpr int numOfMessages = 3;
+    constexpr int unAckedMessagesTimeoutMs = 10000;
+    constexpr int tickDurationInMs = 1000;
+    pulsar::Latch latch(numOfMessages);
+    std::vector<Message> messages;
+    std::mutex mtx;
+
+    int res =
+        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setConsumerType(ConsumerShared);
+    consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    consumerConfig.setTickDurationInMs(tickDurationInMs);
+    consumerConfig.setMessageListener([&](Consumer cons, const Message& msg) {
+        // acknowledge received messages immediately, so no ack timeout is expected
+        ASSERT_EQ(ResultOk, cons.acknowledge(msg.getMessageId()));
+        ASSERT_EQ(0, msg.getRedeliveryCount());
+
+        {
+            std::lock_guard<std::mutex> lock(mtx);
+            messages.emplace_back(msg);
+        }
+
+        if (latch.getCount() > 0) {
+            std::this_thread::sleep_for(
+                std::chrono::milliseconds(unAckedMessagesTimeoutMs + tickDurationInMs * 2));
+            latch.countdown();
+        }
+    });
+    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, consumerConfig, consumer));
+
+    // send messages
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setBlockIfQueueFull(true);
+    producerConfig.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
+    std::string prefix = "message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+    producer.close();
+
+    bool wasUnblocked = latch.wait(
+        std::chrono::milliseconds((unAckedMessagesTimeoutMs + tickDurationInMs * 2) * numOfMessages + 5000));
+    ASSERT_TRUE(wasUnblocked);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(5000));
+    // messages are expected not to be redelivered
+    ASSERT_EQ(numOfMessages, messages.size());
+
+    consumer.close();
+    client.close();
+}
+
 TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
     Client client(lookupUrl);
     const std::string nonPartitionedTopic =