You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/12 00:51:10 UTC

[pulsar] branch master updated: [C++] UnAckedMessageTracker handles batch message incorrectly (#9170)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d4c1677  [C++] UnAckedMessageTracker handles batch message incorrectly (#9170)
d4c1677 is described below

commit d4c1677be93e5673672018f79da8a213514f01a3
Author: sijianliang <li...@foxmail.com>
AuthorDate: Tue Jan 12 08:50:35 2021 +0800

    [C++] UnAckedMessageTracker handles batch message incorrectly (#9170)
    
    ### Motivation
    
     It is necessary to reset  batchIndex of MessageId when tracking a message. Because only the ack of the last message in batch  is
     removed from `UnAckedMessageTrackerEnabled`
    
    ### Verifying this change
    add test case: testBatchUnAckedMessageTracker()
    
    ### Modifications
    
    when track or untrack a message, reset batchIndex of MessageId to -1
---
 pulsar-client-cpp/lib/ConsumerImpl.h               |  1 +
 .../lib/UnAckedMessageTrackerEnabled.cc            | 16 ++---
 .../lib/UnAckedMessageTrackerEnabled.h             |  5 +-
 pulsar-client-cpp/tests/ConsumerTest.cc            | 68 ++++++++++++++++++++++
 4 files changed, 81 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index ef60b94..313f1d9 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -223,6 +223,7 @@ class ConsumerImpl : public ConsumerImplBase,
 
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
+    FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
 };
 
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 96dfd6c..3364c0e 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -90,12 +90,13 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
     timeoutHandler();
 }
 
-bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
+bool UnAckedMessageTrackerEnabled::add(const MessageId& msgId) {
     std::lock_guard<std::mutex> acquire(lock_);
-    if (messageIdPartitionMap.count(m) == 0) {
+    MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
+    if (messageIdPartitionMap.count(id) == 0) {
         std::set<MessageId>& partition = timePartitions.back();
-        bool emplace = messageIdPartitionMap.emplace(m, partition).second;
-        bool insert = partition.insert(m).second;
+        bool emplace = messageIdPartitionMap.emplace(id, partition).second;
+        bool insert = partition.insert(id).second;
         return emplace && insert;
     }
     return false;
@@ -106,13 +107,14 @@ bool UnAckedMessageTrackerEnabled::isEmpty() {
     return messageIdPartitionMap.empty();
 }
 
-bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
+bool UnAckedMessageTrackerEnabled::remove(const MessageId& msgId) {
     std::lock_guard<std::mutex> acquire(lock_);
+    MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
     bool removed = false;
 
-    std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
+    std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(id);
     if (exist != messageIdPartitionMap.end()) {
-        removed = exist->second.erase(m);
+        removed = exist->second.erase(id);
         messageIdPartitionMap.erase(exist);
     }
     return removed;
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 36753dc..12daf45 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -29,8 +29,8 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
     ~UnAckedMessageTrackerEnabled();
     UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
     UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
-    bool add(const MessageId& m);
-    bool remove(const MessageId& m);
+    bool add(const MessageId& msgId);
+    bool remove(const MessageId& msgId);
     void removeMessagesTill(const MessageId& msgId);
     void removeTopicMessage(const std::string& topic);
     void timeoutHandler();
@@ -52,6 +52,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
 
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
+    FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index 2278c05..e0d4091 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -20,6 +20,8 @@
 #include <thread>
 #include <time.h>
 #include <set>
+#include <map>
+#include <vector>
 
 #include "gtest/gtest.h"
 
@@ -345,4 +347,70 @@ TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
     client.close();
 }
 
+TEST(ConsumerTest, testBatchUnAckedMessageTracker) {
+    Client client(lookupUrl);
+    const std::string topic = "testBatchUnAckedMessageTracker" + std::to_string(time(nullptr));
+    std::string subName = "sub-batch-un-acked-msg-tracker";
+    constexpr int numOfMessages = 50;
+    constexpr int batchSize = 5;
+    constexpr int batchCount = numOfMessages / batchSize;
+    constexpr int unAckedMessagesTimeoutMs = 10000;
+    constexpr int tickDurationInMs = 1000;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    consumerConfig.setTickDurationInMs(tickDurationInMs);
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subName, consumerConfig, consumer));
+    auto consumerImplPtr = PulsarFriend::getConsumerImplPtr(consumer);
+    auto tracker =
+        static_cast<UnAckedMessageTrackerEnabled*>(consumerImplPtr->unAckedMessageTrackerPtr_.get());
+
+    // send messages
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(true);
+    producerConfig.setBlockIfQueueFull(true);
+    producerConfig.setBatchingMaxMessages(batchSize);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfig, producer));
+    std::string prefix = "message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.sendAsync(msg, NULL);
+    }
+    producer.close();
+
+    std::map<MessageId, std::vector<MessageId>> msgIdInBatchMap;
+    std::vector<MessageId> messageIds;
+    for (auto i = 0; i < numOfMessages; ++i) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        MessageId msgId = msg.getMessageId();
+        MessageId id(msgId.partition(), msgId.ledgerId(), msgId.entryId(), -1);
+        msgIdInBatchMap[id].emplace_back(msgId);
+    }
+
+    ASSERT_EQ(batchCount, msgIdInBatchMap.size());
+    ASSERT_EQ(batchCount, tracker->size());
+    for (const auto& iter : msgIdInBatchMap) {
+        ASSERT_EQ(iter.second.size(), batchSize);
+    }
+
+    int ackedBatchCount = 0;
+    for (auto iter = msgIdInBatchMap.begin(); iter != msgIdInBatchMap.end(); ++iter) {
+        ASSERT_EQ(batchSize, iter->second.size());
+        for (auto i = 0; i < iter->second.size(); ++i) {
+            ASSERT_EQ(ResultOk, consumer.acknowledge(iter->second[i]));
+        }
+        ackedBatchCount++;
+        ASSERT_EQ(batchCount - ackedBatchCount, tracker->size());
+    }
+    ASSERT_EQ(0, tracker->size());
+    ASSERT_TRUE(tracker->isEmpty());
+
+    consumer.close();
+    client.close();
+}
+
 }  // namespace pulsar