You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/28 03:52:46 UTC

[rocketmq-clients] branch master updated: Retry after 20ms if receiving message is throttled (#79)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 884ad96  Retry after 20ms if receiving message is throttled (#79)
884ad96 is described below

commit 884ad960b6be1a544777f60c25cfb2a47a6feb45
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Thu Jul 28 11:52:42 2022 +0800

    Retry after 20ms if receiving message is throttled (#79)
    
    * Polish readme and build examples with cmake
    
    * Retry after 20ms once receiving message is throttled
---
 cpp/source/client/ReceiveMessageStreamReader.cpp       |  2 +-
 cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp    | 18 ++++++++++++------
 .../rocketmq/include/AsyncReceiveMessageCallback.h     |  2 +-
 3 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 4eccb9f..03204c6 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -30,7 +30,7 @@ ReceiveMessageStreamReader::ReceiveMessageStreamReader(std::weak_ptr<ClientManag
                                                        std::string peer_address,
                                                        rmq::ReceiveMessageRequest request,
                                                        std::unique_ptr<ReceiveMessageContext> context)
-    : client_manager_(client_manager),
+    : client_manager_(std::move(client_manager)),
       stub_(stub),
       peer_address_(std::move(peer_address)),
       request_(std::move(request)),
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index 1e03802..1c96b09 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -24,6 +24,7 @@
 #include "spdlog/spdlog.h"
 #include "ProcessQueue.h"
 #include "PushConsumerImpl.h"
+#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -44,9 +45,15 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const
     return;
   }
 
+  if (ec == ErrorCode::TooManyRequests) {
+    SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. Queue={}", process_queue->simpleName());
+    receiveMessageLater(std::chrono::milliseconds(20));
+    return;
+  }
+
   if (ec) {
-    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Attempt later.", process_queue->simpleName(), ec.message());
-    receiveMessageLater();
+    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), ec.message());
+    receiveMessageLater(std::chrono::seconds (1));
     return;
   }
 
@@ -70,14 +77,14 @@ void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
     SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.",
                 process_queue->simpleName());
     process_queue->syncIdleState();
-    receiveMessageLater();
+    receiveMessageLater(std::chrono::seconds(1));
   } else {
     // Receive message immediately
     receiveMessageImmediately();
   }
 }
 
-void AsyncReceiveMessageCallback::receiveMessageLater() {
+void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay) {
   auto process_queue = process_queue_.lock();
   if (!process_queue) {
     return;
@@ -93,8 +100,7 @@ void AsyncReceiveMessageCallback::receiveMessageLater() {
     }
   };
 
-  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
-                                            std::chrono::seconds(0));
+  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0));
 }
 
 void AsyncReceiveMessageCallback::receiveMessageImmediately() {
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index b3fe2b1..5a13442 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -31,7 +31,7 @@ public:
 
   void onCompletion(const std::error_code& ec, const ReceiveMessageResult& result);
 
-  void receiveMessageLater();
+  void receiveMessageLater(std::chrono::milliseconds delay);
 
   void receiveMessageImmediately();