You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/13 06:59:12 UTC

[pulsar] branch branch-2.5 updated: [pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTracker When Ack Messages . (#6498)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 468bb21  [pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTracker When Ack Messages . (#6498)
468bb21 is described below

commit 468bb21d1261f7e02006434a60351346b0989feb
Author: k2la <mz...@gmail.com>
AuthorDate: Mon Mar 9 16:45:37 2020 +0900

    [pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTracker When Ack Messages . (#6498)
    
    ### Motivation
    Because of #6391 , acked messages were counted as unacked messages.
    Although messages from brokers were acknowledged, the following log was output.
    
    ```
    2020-03-06 19:44:51.790 INFO  ConsumerImpl:174 | [persistent://public/default/t1, sub1, 0] Created consumer on broker [127.0.0.1:58860 -> 127.0.0.1:6650]
    my-message-0: Fri Mar  6 19:45:05 2020
    my-message-1: Fri Mar  6 19:45:05 2020
    my-message-2: Fri Mar  6 19:45:05 2020
    2020-03-06 19:45:15.818 INFO  UnAckedMessageTrackerEnabled:53 | [persistent://public/default/t1, sub1, 0] : 3 Messages were not acked within 10000 time
    
    ```
    
    This behavior happened on master branch.
    (cherry picked from commit 67f8cf30d33f3cb7e8d9309cadb2d80626dd25bc)
---
 pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc | 14 +++++++++-----
 pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h  |  3 +--
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 7894e64..9185dba 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -90,8 +90,10 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
 bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
     std::lock_guard<std::mutex> acquire(lock_);
     if (messageIdPartitionMap.count(m) == 0) {
-        bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
-        return insert && timePartitions.back().insert(m).second;
+        std::set<MessageId>& partition = timePartitions.back();
+        bool emplace = messageIdPartitionMap.emplace(m, partition).second;
+        bool insert = partition.insert(m).second;
+        return emplace && insert;
     }
     return false;
 }
@@ -104,7 +106,8 @@ bool UnAckedMessageTrackerEnabled::isEmpty() {
 bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
     std::lock_guard<std::mutex> acquire(lock_);
     bool removed = false;
-    std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m);
+
+    std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
     if (exist != messageIdPartitionMap.end()) {
         removed = exist->second.erase(m);
     }
@@ -121,7 +124,7 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
         MessageId msgIdInMap = it->first;
         if (msgIdInMap < msgId) {
-            std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId);
+            std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(msgId);
             if (exist != messageIdPartitionMap.end()) {
                 exist->second.erase(msgId);
             }
@@ -135,7 +138,8 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
     for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
         MessageId msgIdInMap = it->first;
         if (msgIdInMap.getTopicName().compare(topic) == 0) {
-            std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
+            std::map<MessageId, std::set<MessageId>&>::iterator exist =
+                messageIdPartitionMap.find(msgIdInMap);
             if (exist != messageIdPartitionMap.end()) {
                 exist->second.erase(msgIdInMap);
             }
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index c2b4012..9195b30 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -23,7 +23,6 @@
 #include <mutex>
 
 namespace pulsar {
-
 class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
    public:
     ~UnAckedMessageTrackerEnabled();
@@ -41,7 +40,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
     void timeoutHandlerHelper();
     bool isEmpty();
     long size();
-    std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
+    std::map<MessageId, std::set<MessageId>&> messageIdPartitionMap;
     std::deque<std::set<MessageId>> timePartitions;
     std::mutex lock_;
     DeadlineTimerPtr timer_;