You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "shibd (via GitHub)" <gi...@apache.org> on 2023/03/28 13:40:33 UTC

[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #232: Support waiting for the ACK response

shibd commented on code in PR #232:
URL: https://github.com/apache/pulsar-client-cpp/pull/232#discussion_r1150590482


##########
lib/ConsumerImpl.cc:
##########
@@ -1089,12 +1105,13 @@ void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callb
     const auto& msgIdToAck = pair.first;
     const bool readyToAck = pair.second;
     if (readyToAck) {
-        ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck);
+        ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck, callback);
+    } else {
+        if (callback) {

Review Comment:
   Why do direct callback when readyToAck is false? 
   
   If batchIndexAck = false, and the batch message does not have ack all messages, here also needs to wait(if ackReceiptEnabled == true)?



##########
lib/AckGroupingTracker.cc:
##########
@@ -62,25 +74,39 @@ static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msg
     return os;
 }
 
-bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
-                                        const std::set<MessageId>& msgIds) {
-    auto cnx = connWeakPtr.lock();
-    if (cnx == nullptr) {
-        LOG_DEBUG("Connection is not ready, ACK failed.");
-        return false;
+void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
+    const auto cnx = connectionSupplier_();
+    if (!cnx) {
+        LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
+        return;
     }
 
     if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
-        auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
-        cnx->sendCommand(cmd);
-        LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
+        if (waitResponse_) {
+            const auto requestId = requestIdSupplier_();
+            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
+                .addListener([callback](Result result, const ResponseData&) {
+                    if (callback) {
+                        callback(result);
+                    }
+                });
+        } else {
+            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));

Review Comment:
   Here also need to callback?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org