You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/28 03:51:18 UTC

[rocketmq-clients] branch cpp_dev updated: Retry after 20ms once receiving message is throttled

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/cpp_dev by this push:
     new 0ec359a  Retry after 20ms once receiving message is throttled
0ec359a is described below

commit 0ec359ac33b3b93251d0e5bc30bac497ebb1ff65
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jul 28 11:51:02 2022 +0800

    Retry after 20ms once receiving message is throttled
---
 cpp/source/client/ReceiveMessageStreamReader.cpp       |  2 +-
 cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp    | 18 ++++++++++++------
 .../rocketmq/include/AsyncReceiveMessageCallback.h     |  2 +-
 3 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 4eccb9f..03204c6 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -30,7 +30,7 @@ ReceiveMessageStreamReader::ReceiveMessageStreamReader(std::weak_ptr<ClientManag
                                                        std::string peer_address,
                                                        rmq::ReceiveMessageRequest request,
                                                        std::unique_ptr<ReceiveMessageContext> context)
-    : client_manager_(client_manager),
+    : client_manager_(std::move(client_manager)),
       stub_(stub),
       peer_address_(std::move(peer_address)),
       request_(std::move(request)),
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index 1e03802..1c96b09 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -24,6 +24,7 @@
 #include "spdlog/spdlog.h"
 #include "ProcessQueue.h"
 #include "PushConsumerImpl.h"
+#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -44,9 +45,15 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const
     return;
   }
 
+  if (ec == ErrorCode::TooManyRequests) {
+    SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. Queue={}", process_queue->simpleName());
+    receiveMessageLater(std::chrono::milliseconds(20));
+    return;
+  }
+
   if (ec) {
-    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Attempt later.", process_queue->simpleName(), ec.message());
-    receiveMessageLater();
+    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), ec.message());
+    receiveMessageLater(std::chrono::seconds (1));
     return;
   }
 
@@ -70,14 +77,14 @@ void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
     SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.",
                 process_queue->simpleName());
     process_queue->syncIdleState();
-    receiveMessageLater();
+    receiveMessageLater(std::chrono::seconds(1));
   } else {
     // Receive message immediately
     receiveMessageImmediately();
   }
 }
 
-void AsyncReceiveMessageCallback::receiveMessageLater() {
+void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay) {
   auto process_queue = process_queue_.lock();
   if (!process_queue) {
     return;
@@ -93,8 +100,7 @@ void AsyncReceiveMessageCallback::receiveMessageLater() {
     }
   };
 
-  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
-                                            std::chrono::seconds(0));
+  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
 }
 
 void AsyncReceiveMessageCallback::receiveMessageImmediately() {
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index b3fe2b1..5a13442 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -31,7 +31,7 @@ public:
 
   void onCompletion(const std::error_code& ec, const ReceiveMessageResult& result);
 
-  void receiveMessageLater();
+  void receiveMessageLater(std::chrono::milliseconds delay);
 
   void receiveMessageImmediately();