You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/28 08:53:33 UTC
[rocketmq-clients] branch cpp_dev updated: Retry ack/nack/forward-to-dlq of messages unless the lifecycle completes: success or receipt-handle becomes invalid
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/cpp_dev by this push:
new 24bc2ce Retry ack/nack/forward-to-dlq of messages unless the lifecycle completes: success or receipt-handle becomes invalid
24bc2ce is described below
commit 24bc2ce5ba9a73df53b610729ee73fda6973fab0
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jul 28 16:53:23 2022 +0800
Retry ack/nack/forward-to-dlq of messages unless the lifecycle completes: success or receipt-handle becomes invalid
---
cpp/source/rocketmq/ConsumeTask.cpp | 62 +++++++++++++++++++++----------
cpp/source/rocketmq/include/ConsumeTask.h | 2 +
2 files changed, 44 insertions(+), 20 deletions(-)
diff --git a/cpp/source/rocketmq/ConsumeTask.cpp b/cpp/source/rocketmq/ConsumeTask.cpp
index 9e87828..5981d07 100644
--- a/cpp/source/rocketmq/ConsumeTask.cpp
+++ b/cpp/source/rocketmq/ConsumeTask.cpp
@@ -18,11 +18,12 @@
#include "ConsumeTask.h"
#include "ConsumeStats.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
#include "PushConsumerImpl.h"
#include "Tag.h"
#include "rocketmq/ConsumeResult.h"
+#include "rocketmq/ErrorCode.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -70,42 +71,56 @@ void ConsumeTask::schedule() {
}
void ConsumeTask::onAck(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
- if (task->fifo_ && ec) {
- auto service = task->service_.lock();
- task->next_step_ = NextStep::Ack;
- task->schedule();
- } else {
- // If it is not FIFO or ack operation succeeded
+ // Treat both success and invalid-receipt-handle as completion
+ if (!ec || ec == ErrorCode::InvalidReceiptHandle) {
task->pop();
task->next_step_ = NextStep::Consume;
+ task->submit();
+ return;
}
- task->submit();
+
+ // Try to ack again later
+ SPDLOG_WARN("Failed to ack message[message-id={}]. Cause: {}. Action: retry after 1s.", task->messages_[0]->id(),
+ ec.message());
+ task->next_step_ = NextStep::Ack;
+ task->schedule();
}
void ConsumeTask::onNack(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
assert(!task->fifo_);
assert(!task->messages_.empty());
- if (ec) {
- SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}", task->messages_[0]->id(), ec.message());
+
+ // Treat both success and invalid-receipt-handle as completion
+ if (!ec || ErrorCode::InvalidReceiptHandle == ec) {
+ task->pop();
+ task->next_step_ = NextStep::Consume;
+ task->submit();
+ return;
}
- task->pop();
- task->next_step_ = NextStep::Consume;
- task->submit();
+
+ SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}. Action: retry after 1s.", task->messages_[0]->id(),
+ ec.message());
+ task->next_step_ = NextStep::Nack;
+ task->schedule();
}
void ConsumeTask::onForward(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
assert(task->fifo_);
assert(!task->messages_.empty());
- if (ec) {
- SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ", task->messages_[0]->id());
- task->next_step_ = NextStep::Forward;
- task->schedule();
- } else {
- SPDLOG_DEBUG("Message[message-id={}] forwarded to DLQ", task->messages_[0]->id());
+
+ // Treat both success and invalid-receipt-handle as completion
+ if (!ec || ErrorCode::InvalidReceiptHandle == ec) {
+ SPDLOG_DEBUG("Message[message-id={}] is forwarded to DLQ", task->messages_[0]->id());
task->pop();
task->next_step_ = NextStep::Consume;
task->submit();
+ return;
}
+
+ SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ. Cause: {}. Action: retry after 1s.",
+ task->messages_[0]->id(), ec.message());
+ task->next_step_ = NextStep::Forward;
+ task->schedule();
}
void ConsumeTask::process() {
@@ -199,6 +214,13 @@ void ConsumeTask::process() {
svc->ack(*messages_[0], callback);
break;
}
+
+ case NextStep::Nack: {
+ auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
+ svc->nack(*messages_[0], callback);
+ break;
+ }
+
case NextStep::Forward: {
assert(!messages_.empty());
auto callback = std::bind(&ConsumeTask::onForward, self, std::placeholders::_1);
diff --git a/cpp/source/rocketmq/include/ConsumeTask.h b/cpp/source/rocketmq/include/ConsumeTask.h
index 80ca971..38acfc3 100644
--- a/cpp/source/rocketmq/include/ConsumeTask.h
+++ b/cpp/source/rocketmq/include/ConsumeTask.h
@@ -41,6 +41,8 @@ enum class NextStep : std::uint8_t
*/
Ack,
+ Nack,
+
/**
* @brief Forward the head, aka, messages_[0], to dead-letter-queue.
*/