You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/28 08:53:33 UTC

[rocketmq-clients] branch cpp_dev updated: Retry ack/nack/forward-to-dlq of messages unless the lifecycle completes: success or receipt-handle becomes invalid

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/cpp_dev by this push:
     new 24bc2ce  Retry ack/nack/forward-to-dlq of messages unless the lifecycle completes: success or receipt-handle becomes invalid
24bc2ce is described below

commit 24bc2ce5ba9a73df53b610729ee73fda6973fab0
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jul 28 16:53:23 2022 +0800

    Retry ack/nack/forward-to-dlq of messages unless the lifecycle completes: success or receipt-handle becomes invalid
---
 cpp/source/rocketmq/ConsumeTask.cpp       | 62 +++++++++++++++++++++----------
 cpp/source/rocketmq/include/ConsumeTask.h |  2 +
 2 files changed, 44 insertions(+), 20 deletions(-)

diff --git a/cpp/source/rocketmq/ConsumeTask.cpp b/cpp/source/rocketmq/ConsumeTask.cpp
index 9e87828..5981d07 100644
--- a/cpp/source/rocketmq/ConsumeTask.cpp
+++ b/cpp/source/rocketmq/ConsumeTask.cpp
@@ -18,11 +18,12 @@
 #include "ConsumeTask.h"
 
 #include "ConsumeStats.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
 #include "PushConsumerImpl.h"
 #include "Tag.h"
 #include "rocketmq/ConsumeResult.h"
+#include "rocketmq/ErrorCode.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -70,42 +71,56 @@ void ConsumeTask::schedule() {
 }
 
 void ConsumeTask::onAck(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
-  if (task->fifo_ && ec) {
-    auto service = task->service_.lock();
-    task->next_step_ = NextStep::Ack;
-    task->schedule();
-  } else {
-    // If it is not FIFO or ack operation succeeded
+  // Treat both success and invalid-receipt-handle as completion
+  if (!ec || ec == ErrorCode::InvalidReceiptHandle) {
     task->pop();
     task->next_step_ = NextStep::Consume;
+    task->submit();
+    return;
   }
-  task->submit();
+
+  // Try to ack again later
+  SPDLOG_WARN("Failed to ack message[message-id={}]. Cause: {}. Action: retry after 1s.", task->messages_[0]->id(),
+              ec.message());
+  task->next_step_ = NextStep::Ack;
+  task->schedule();
 }
 
 void ConsumeTask::onNack(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
   assert(!task->fifo_);
   assert(!task->messages_.empty());
-  if (ec) {
-    SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}", task->messages_[0]->id(), ec.message());
+
+  // Treat both success and invalid-receipt-handle as completion
+  if (!ec || ErrorCode::InvalidReceiptHandle == ec) {
+    task->pop();
+    task->next_step_ = NextStep::Consume;
+    task->submit();
+    return;
   }
-  task->pop();
-  task->next_step_ = NextStep::Consume;
-  task->submit();
+
+  SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}. Action: retry after 1s.", task->messages_[0]->id(),
+              ec.message());
+  task->next_step_ = NextStep::Nack;
+  task->schedule();
 }
 
 void ConsumeTask::onForward(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
   assert(task->fifo_);
   assert(!task->messages_.empty());
-  if (ec) {
-    SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ", task->messages_[0]->id());
-    task->next_step_ = NextStep::Forward;
-    task->schedule();
-  } else {
-    SPDLOG_DEBUG("Message[message-id={}] forwarded to DLQ", task->messages_[0]->id());
+
+  // Treat both success and invalid-receipt-handle as completion
+  if (!ec || ErrorCode::InvalidReceiptHandle == ec) {
+    SPDLOG_DEBUG("Message[message-id={}] is forwarded to DLQ", task->messages_[0]->id());
     task->pop();
     task->next_step_ = NextStep::Consume;
     task->submit();
+    return;
   }
+
+  SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ. Cause: {}.  Action: retry after 1s.",
+               task->messages_[0]->id(), ec.message());
+  task->next_step_ = NextStep::Forward;
+  task->schedule();
 }
 
 void ConsumeTask::process() {
@@ -199,6 +214,13 @@ void ConsumeTask::process() {
       svc->ack(*messages_[0], callback);
       break;
     }
+
+    case NextStep::Nack: {
+      auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
+      svc->nack(*messages_[0], callback);
+      break;
+    }
+
     case NextStep::Forward: {
       assert(!messages_.empty());
       auto callback = std::bind(&ConsumeTask::onForward, self, std::placeholders::_1);
diff --git a/cpp/source/rocketmq/include/ConsumeTask.h b/cpp/source/rocketmq/include/ConsumeTask.h
index 80ca971..38acfc3 100644
--- a/cpp/source/rocketmq/include/ConsumeTask.h
+++ b/cpp/source/rocketmq/include/ConsumeTask.h
@@ -41,6 +41,8 @@ enum class NextStep : std::uint8_t
    */
   Ack,
 
+  Nack,
+
   /**
    * @brief Forward the head, aka, messages_[0], to dead-letter-queue.
    */