You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/28 03:52:46 UTC
[rocketmq-clients] branch master updated: Retry after 20ms if receiving message is throttled (#79)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 884ad96 Retry after 20ms if receiving message is throttled (#79)
884ad96 is described below
commit 884ad960b6be1a544777f60c25cfb2a47a6feb45
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Thu Jul 28 11:52:42 2022 +0800
Retry after 20ms if receiving message is throttled (#79)
* Polish readme and build examples with cmake
* Retry after 20ms once receiving message is throttled
---
cpp/source/client/ReceiveMessageStreamReader.cpp | 2 +-
cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp | 18 ++++++++++++------
.../rocketmq/include/AsyncReceiveMessageCallback.h | 2 +-
3 files changed, 14 insertions(+), 8 deletions(-)
diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 4eccb9f..03204c6 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -30,7 +30,7 @@ ReceiveMessageStreamReader::ReceiveMessageStreamReader(std::weak_ptr<ClientManag
std::string peer_address,
rmq::ReceiveMessageRequest request,
std::unique_ptr<ReceiveMessageContext> context)
- : client_manager_(client_manager),
+ : client_manager_(std::move(client_manager)),
stub_(stub),
peer_address_(std::move(peer_address)),
request_(std::move(request)),
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index 1e03802..1c96b09 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -24,6 +24,7 @@
#include "spdlog/spdlog.h"
#include "ProcessQueue.h"
#include "PushConsumerImpl.h"
+#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -44,9 +45,15 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const
return;
}
+ if (ec == ErrorCode::TooManyRequests) {
+ SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. Queue={}", process_queue->simpleName());
+ receiveMessageLater(std::chrono::milliseconds(20));
+ return;
+ }
+
if (ec) {
- SPDLOG_WARN("Receive message from {} failed. Cause: {}. Attempt later.", process_queue->simpleName(), ec.message());
- receiveMessageLater();
+ SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), ec.message());
+ receiveMessageLater(std::chrono::seconds (1));
return;
}
@@ -70,14 +77,14 @@ void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.",
process_queue->simpleName());
process_queue->syncIdleState();
- receiveMessageLater();
+ receiveMessageLater(std::chrono::seconds(1));
} else {
// Receive message immediately
receiveMessageImmediately();
}
}
-void AsyncReceiveMessageCallback::receiveMessageLater() {
+void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay) {
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
@@ -93,8 +100,7 @@ void AsyncReceiveMessageCallback::receiveMessageLater() {
}
};
- client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
- std::chrono::seconds(0));
+ client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
}
void AsyncReceiveMessageCallback::receiveMessageImmediately() {
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index b3fe2b1..5a13442 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -31,7 +31,7 @@ public:
void onCompletion(const std::error_code& ec, const ReceiveMessageResult& result);
- void receiveMessageLater();
+ void receiveMessageLater(std::chrono::milliseconds delay);
void receiveMessageImmediately();