You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/20 09:06:40 UTC

[GitHub] [pulsar] shibd opened a new pull request, #17739: [improve][cpp] Support acknowledging a list of messages

shibd opened a new pull request, #17739:
URL: https://github.com/apache/pulsar/pull/17739

   ### Motivation
   https://github.com/apache/pulsar/issues/17428
   
   ### Modifications
   - Add acknowledge message id list API.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   https://github.com/shibd/pulsar/pull/10
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r977163674


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -925,9 +926,21 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
     doAcknowledgeIndividual(msgId, cb);
 }
 
+void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, shared_from_this(), std::placeholders::_1,
+                                  callback, proto::CommandAck_AckType_Individual, messageIdList.size());
+    // Currently not supported batch message id individual index ack.
+    this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdList);
+    for (const auto& messageId : messageIdList) {
+        this->unAckedMessageTrackerPtr_->remove(messageId);
+        this->batchAcknowledgementTracker_.deleteAckedMessage(messageId, proto::CommandAck::Individual);
+    }

Review Comment:
   Yes, It is better to provide an accept `MessageIdList` method. I will provide it. 
   
   But I want to know what is the race condition for the actual scenario. For `UnAckedMessageTracker`, Will there be the following scenario?
   
   have a `MessageIdList`: {1, 2, 3, 4}
   
   on acknowledge thread:
   
   ``` c++
   for MessageIdList{
        this->unAckedMessageTrackerPtr_->remove(msgId);
   }
   ```
   
   and another thread to call: `this->unAckedMessageTrackerPtr_->add(msgId);`  
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#issuecomment-1253686949

   > Could you rebase to master? I saw some strange errors in CI.
   
   I rebased master, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd closed pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd closed pull request #17739: [improve][cpp] Support acknowledging a list of messages
URL: https://github.com/apache/pulsar/pull/17739


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r976521719


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -925,9 +926,21 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
     doAcknowledgeIndividual(msgId, cb);
 }
 
+void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, shared_from_this(), std::placeholders::_1,
+                                  callback, proto::CommandAck_AckType_Individual, messageIdList.size());
+    // Currently not supported batch message id individual index ack.
+    this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdList);
+    for (const auto& messageId : messageIdList) {
+        this->unAckedMessageTrackerPtr_->remove(messageId);
+        this->batchAcknowledgementTracker_.deleteAckedMessage(messageId, proto::CommandAck::Individual);
+    }

Review Comment:
   We should add thread safe `remove` and `deleteAckedMessage` methods that accept `MessageIdList` to make these calls thread safe. You can see both `UnAckedMessageTrackerEnabled::remove` and `BatchAcknowledgementTracker::deleteAckedMessage` are thread safe so the `acknowledgeAsync` method for a single message won't lead to a race condition for each component (`unAckedMessageTrackerPtr_` and `unAckedMessageTrackerPtr_`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r976530744


##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -572,6 +572,46 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        if (topicToMessageId.count(topicName) == 0) {
+            topicToMessageId.emplace(topicName, std::vector<MessageId>());
+        }
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    std::shared_ptr<std::atomic<int>> needCallBack =
+        std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    auto cb = [callback, needCallBack](Result result) {
+        if (result != ResultOk) {
+            callback(result);
+        }

Review Comment:
   If the results are all not `ResultOk` in N message IDs, then the callback will be called for N times. We should guarantee the callback is called only once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r976528019


##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -572,6 +572,46 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        if (topicToMessageId.count(topicName) == 0) {
+            topicToMessageId.emplace(topicName, std::vector<MessageId>());
+        }
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    std::shared_ptr<std::atomic<int>> needCallBack =
+        std::make_shared<std::atomic<int>>(topicToMessageId.size());

Review Comment:
   ```suggestion
       auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
   ```
   
   For complicated types, it's better to use C++'s auto type deduction.



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -572,6 +572,46 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::map<std::string, MessageIdList> topicToMessageId;

Review Comment:
   We can use an `unordered_map` (hash map) here.



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -572,6 +572,46 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        if (topicToMessageId.count(topicName) == 0) {
+            topicToMessageId.emplace(topicName, std::vector<MessageId>());
+        }

Review Comment:
   ```suggestion
   ```
   
   The following `topicToMessageId[topicName]` call already inserts a key-value if the key doesn't exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r976542832


##########
pulsar-client-cpp/tests/PulsarFriend.h:
##########
@@ -57,6 +57,19 @@ class PulsarFriend {
         return std::static_pointer_cast<ConsumerStatsImpl>(consumerImpl->consumerStatsBasePtr_);
     }
 
+    // just param is MultiTopicConsumerImpl

Review Comment:
   This comment is confused. I think you want to mean it should only be called when the underlying consumer of `consumer` is `MultiTopicConsumerImpl`?
   
   If yes, please remove the confused comment and use `dynamic_cast` and null check instead of `static_cast` in the implementation. `dynamic_cast` can check the RTTI info so that it will return null if the cast is invalid. e.g.
   
   ```c++
   #include <iostream>
   using namespace std;
   
   struct Base {
       virtual void f() {}
   };
   
   struct Derived : Base {
       void f() override {}
   };
   
   void test(Base* base) {
       cout << "dynamic_cast: " << dynamic_cast<Derived*>(base)
            << "\nstatic_cast: " << static_cast<Derived*>(base) << endl;
       delete base;
   }
   
   int main(int argc, char* argv[]) {
       test(new Base);
       test(new Derived);
       return 0;
   }
   ```
   
   You can see the difference from the output:
   
   ```
   dynamic_cast: 0x0
   static_cast: 0x600001eac020
   dynamic_cast: 0x600001eac020
   static_cast: 0x600001eac020
   ```
   



##########
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc:
##########
@@ -572,6 +572,46 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        if (topicToMessageId.count(topicName) == 0) {
+            topicToMessageId.emplace(topicName, std::vector<MessageId>());
+        }
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    std::shared_ptr<std::atomic<int>> needCallBack =
+        std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    auto cb = [callback, needCallBack](Result result) {
+        if (result != ResultOk) {
+            callback(result);
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(result);
+        }
+    };
+    for (const auto& kv : topicToMessageId) {
+        auto optConsumer = consumers_.find(kv.first);
+        if (optConsumer.is_present()) {
+            for (const auto& msgId : kv.second) {
+                unAckedMessageTrackerPtr_->remove(msgId);
+            }

Review Comment:
   The same as my comment before, calling `remove` in a loop is not thread safe for `unAckedMessageTrackerPtr_`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#issuecomment-1253344295

   Could you rebase to master? I saw some strange errors in CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#discussion_r977497200


##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -925,9 +926,21 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
     doAcknowledgeIndividual(msgId, cb);
 }
 
+void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback, shared_from_this(), std::placeholders::_1,
+                                  callback, proto::CommandAck_AckType_Individual, messageIdList.size());
+    // Currently not supported batch message id individual index ack.
+    this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdList);
+    for (const auto& messageId : messageIdList) {
+        this->unAckedMessageTrackerPtr_->remove(messageId);
+        this->batchAcknowledgementTracker_.deleteAckedMessage(messageId, proto::CommandAck::Individual);
+    }

Review Comment:
   I add new `remove(MessageIdList)` method on `UnAckedMessageTracker`, Since `BatchAcknowledgementTracker` not support `Individual` ack, so, I remove invoke `this->batchAcknowledgementTracker_.deleteAckedMessage` code. PTAL. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #17739: [improve][cpp] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #17739:
URL: https://github.com/apache/pulsar/pull/17739#issuecomment-1271624368

   Resubmit the PR in pulsar-client-cpp repository: https://github.com/apache/pulsar-client-cpp/pull/23


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org