You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/07 13:51:01 UTC

[GitHub] [pulsar-client-cpp] shibd opened a new pull request, #23: [feat] Support acknowledging a list of messages

shibd opened a new pull request, #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23

   ### Motivation
   https://github.com/apache/pulsar/issues/17428
   
   ### Modifications
   - Add acknowledge message id list API.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   https://github.com/shibd/pulsar-client-cpp/pull/3
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010287985


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    Result res = ResultOk;
+    auto cb = [callback, needCallBack, &res](Result result) {
+        if (result != ResultOk) {
+            res = result;
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(res);
+        }

Review Comment:
   You can test this example:
   
   ```c++
   #include <chrono>
   #include <iostream>
   #include <thread>
   using namespace std;
   
   void f() {
     int x = 0;
     int a[1];  // do nothing
     std::thread t([&x] {
       std::this_thread::sleep_for(std::chrono::milliseconds(100));
       cout << std::this_thread::get_id() << " " << &x << " " << x << endl;
     });
     t.detach();
     cout << std::this_thread::get_id() << " " << &x << " " << x << endl;
   }
   
   int main() {
     f();
     std::this_thread::sleep_for(std::chrono::milliseconds(100));
   }
   ```
   
   ```bash
   $ g++ 1.cc -std=c++11 -pthread
   $ ./a.out
   140118122346304 0x7ffffa28a90c 0
   140118122342144 0x7ffffa28a90c 32623
   $ ./a.out
   139920470566720 0x7fffd23f916c 0
   139920470562560 0x7fffd23f916c 32767
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010398877


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    Result res = ResultOk;
+    auto cb = [callback, needCallBack, &res](Result result) {
+        if (result != ResultOk) {
+            res = result;
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(res);
+        }

Review Comment:
   Thank your explanation, I remove `res`. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#issuecomment-1298125364

   @BewareMyPower @RobertIndie @Demogorgon314  PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010280775


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    Result res = ResultOk;
+    auto cb = [callback, needCallBack, &res](Result result) {
+        if (result != ResultOk) {
+            res = result;
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(res);
+        }

Review Comment:
   `res` should be defined inside the lambda, not outside. Because it's only used in the lambda.
   
   BTW, please keep it in mind that **never capture a local variable by reference** unless the lambda will only be called inside the scope. 
   1. The lifetime of the local variable will end after going out of the scope, while the lambda could be invoked in another thread. 
   2. Since multiple threads can access the shared local variable, it must be atomic.
   
   That's also why `std::shared_ptr<std::atomic<T>>` is used in other places.
   1. Capturing a `std::shared_ptr<U>`  **by value** will increase the reference count and extend the lifetime of the `U` instance until all lambdas finished.
   2. `U` is `std::atomic<T>`, which is thread safe to access among different threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010280775


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    Result res = ResultOk;
+    auto cb = [callback, needCallBack, &res](Result result) {
+        if (result != ResultOk) {
+            res = result;
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(res);
+        }

Review Comment:
   `res` should be defined inside the lambda, not outside. Because it's only used in the lambda.
   
   BTW, please keep it in mind that never capture a reference of a local variable. 
   1. The lifetime of the local variable will end after going out of the scope, while the lambda could be invoked in another thread. 
   2. Since multiple threads can access the shared local variable, it must be atomic.
   
   That's also why `std::shared_ptr<std::atomic<T>>` is used in other places.
   1. Capturing a `std::shared_ptr<U>`  will increase the reference count and extend the lifetime of the `U` instance until all lambdas finished.
   2. `U` is `std::atomic<T>`, which is thread safe to access among different threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010270468


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    Result res = ResultOk;
+    auto cb = [callback, needCallBack, &res](Result result) {
+        if (result != ResultOk) {
+            res = result;
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(res);
+        }

Review Comment:
   The combination of two method calls on an atomic variable is not atomic. You can simply write `if (--(*needCallBack) == 0)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010294373


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    Result res = ResultOk;
+    auto cb = [callback, needCallBack, &res](Result result) {
+        if (result != ResultOk) {
+            res = result;
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(res);
+        }
+    };
+    for (const auto& kv : topicToMessageId) {
+        auto optConsumer = consumers_.find(kv.first);
+        if (optConsumer.is_present()) {
+            unAckedMessageTrackerPtr_->remove(kv.second);
+            optConsumer.value()->acknowledgeAsync(kv.second, cb);
+        } else {
+            LOG_ERROR("Message of topic: " << kv.first << " not in unAckedMessageTracker");

Review Comment:
   This log is misleading. `consumers_` is not `UnAckedMessageTracker`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower merged pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#issuecomment-1298331297

   Overall LGTM except the comments I've mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010443325


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,42 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    auto cb = [callback, needCallBack](Result result) {
+        if (result != ResultOk) {
+            LOG_ERROR("Filed when acknowledge list: " << result);
+            callback(result);
+            // set needCallBack is -1 to avoid repeated callback.
+            needCallBack->store(-1);

Review Comment:
   Make sense. Add `return` can avoid one calculation and judgment. I added it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010434404


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,42 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    auto cb = [callback, needCallBack](Result result) {
+        if (result != ResultOk) {
+            LOG_ERROR("Filed when acknowledge list: " << result);
+            callback(result);
+            // set needCallBack is -1 to avoid repeated callback.
+            needCallBack->store(-1);

Review Comment:
   It's better to call `needCallBack->store(-1)` first  because `callback` is passed from the application side and it could took some time to return. And should we add a `return` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #23: [feat] Support acknowledging a list of messages

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010280775


##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 }
 
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    std::unordered_map<std::string, MessageIdList> topicToMessageId;
+    for (const MessageId& messageId : messageIdList) {
+        auto topicName = messageId.getTopicName();
+        topicToMessageId[topicName].emplace_back(messageId);
+    }
+
+    auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
+    Result res = ResultOk;
+    auto cb = [callback, needCallBack, &res](Result result) {
+        if (result != ResultOk) {
+            res = result;
+        }
+        needCallBack->fetch_sub(1);
+        if (needCallBack->load() == 0) {
+            callback(res);
+        }

Review Comment:
   `res` should be defined inside the lambda, not outside. Because it's only used in the lambda.
   
   BTW, please keep it in mind that **never capture a local variable by reference**. 
   1. The lifetime of the local variable will end after going out of the scope, while the lambda could be invoked in another thread. 
   2. Since multiple threads can access the shared local variable, it must be atomic.
   
   That's also why `std::shared_ptr<std::atomic<T>>` is used in other places.
   1. Capturing a `std::shared_ptr<U>`  **by value** will increase the reference count and extend the lifetime of the `U` instance until all lambdas finished.
   2. `U` is `std::atomic<T>`, which is thread safe to access among different threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org