You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/15 04:08:17 UTC

[rocketmq-clients] 03/03: Log TooManyRequest

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 513de8f3eaa802f445f10928312614016f866b3b
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Mar 15 11:50:43 2023 +0800

    Log TooManyRequest
---
 csharp/rocketmq-client-csharp/Producer.cs | 149 ++++++++++++++++--------------
 diff_sec.zip                              | Bin 0 -> 174 bytes
 2 files changed, 79 insertions(+), 70 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index da7e8d03..392c50d4 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -168,39 +168,8 @@ namespace Org.Apache.Rocketmq
                 ? publishingLoadBalancer.TakeMessageQueues(new HashSet<Endpoints>(Isolated.Keys), maxAttempts)
                 : new List<MessageQueue>
                     { publishingLoadBalancer.TakeMessageQueueByMessageGroup(publishingMessage.MessageGroup) };
-            Exception exception = null;
-            for (var attempt = 1; attempt <= maxAttempts; attempt++)
-            {
-                var stopwatch = Stopwatch.StartNew();
-                try
-                {
-                    var sendReceipt = await Send0(publishingMessage, candidates, attempt, maxAttempts);
-                    return sendReceipt;
-                }
-                catch (Exception e)
-                {
-                    exception = e;
-                }
-                finally
-                {
-                    var elapsed = stopwatch.Elapsed.Milliseconds;
-                    _sendCostTimeHistogram.Record(elapsed,
-                        new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
-                        new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
-                        new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
-                            null == exception ? MetricConstant.Success : MetricConstant.Failure));
-                    // Retry immediately if the request is not throttled.
-                    if (exception is TooManyRequestsException)
-                    {
-                        var nextAttempt = 1 + attempt;
-                        var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
-                        await Task.Delay(delay);
-                    }
-                }
-            }
-
-            throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
-                exception);
+            var sendReceipt = await Send0(publishingMessage, candidates);
+            return sendReceipt;
         }
 
         public async Task<ISendReceipt> Send(Message message)
@@ -236,53 +205,93 @@ namespace Org.Apache.Rocketmq
             };
         }
 
-        private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates, int attempt,
-            int maxAttempts)
+        private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates)
         {
-            var candidateIndex = (attempt - 1) % candidates.Count;
-            var mq = candidates[candidateIndex];
-            if (PublishingSettings.IsValidateMessageType() &&
-                !mq.AcceptMessageTypes.Contains(message.MessageType))
-            {
-                throw new ArgumentException("Current message type does not match with the accept message types," +
-                                            $" topic={message.Topic}, actualMessageType={message.MessageType}" +
-                                            $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
-            }
-
-            var sendMessageRequest = WrapSendMessageRequest(message, mq);
-            var endpoints = mq.Broker.Endpoints;
-            var invocation = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
-            try
+            var retryPolicy = GetRetryPolicy();
+            var maxAttempts = retryPolicy.GetMaxAttempts();
+            Exception exception = null;
+            for (var attempt = 1; attempt <= maxAttempts; attempt++)
             {
-                var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
+                var stopwatch = Stopwatch.StartNew();
 
-                var sendReceipt = sendReceipts.First();
-                if (attempt > 1)
+                var candidateIndex = (attempt - 1) % candidates.Count;
+                var mq = candidates[candidateIndex];
+                if (PublishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType))
                 {
-                    Logger.Info(
-                        $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
-                        $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+                    throw new ArgumentException(
+                        "Current message type does not match with the accept message types," +
+                        $" topic={message.Topic}, actualMessageType={message.MessageType}" +
+                        $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
                 }
 
-                return sendReceipt;
-            }
-            catch (Exception e)
-            {
-                // Isolate current endpoints.
-                Isolated[endpoints] = true;
-                if (attempt >= maxAttempts)
+                var sendMessageRequest = WrapSendMessageRequest(message, mq);
+                var endpoints = mq.Broker.Endpoints;
+                try
                 {
-                    Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
-                                    $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
-                                    $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
-                    throw;
+                    var invocation =
+                        await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
+                    var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
+
+                    var sendReceipt = sendReceipts.First();
+                    if (attempt > 1)
+                    {
+                        Logger.Info(
+                            $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
+                            $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+                    }
+
+                    return sendReceipt;
                 }
+                catch (Exception e)
+                {
+                    // Isolate current endpoints.
+                    Isolated[endpoints] = true;
+                    if (attempt >= maxAttempts)
+                    {
+                        Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
+                                        $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
+                                        $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+                        throw;
+                    }
 
-                Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
-                               $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
-                               $" clientId={ClientId}");
-                throw;
+                    if (MessageType.Transaction == message.MessageType)
+                    {
+                        Logger.Error(e, "Failed to send transaction message, run out of attempt times, " +
+                                        $"topic={message.Topic}, maxAttempt=1, attempt={attempt}, " +
+                                        $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+                        throw;
+                    }
+
+                    exception = e;
+                    if (exception is not TooManyRequestsException)
+                    {
+                        // Retry immediately if the request is not throttled.
+                        Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
+                                       $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
+                                       $" clientId={ClientId}");
+                        continue;
+                    }
+
+                    var nextAttempt = 1 + attempt;
+                    var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
+                    await Task.Delay(delay);
+                    Logger.Warn(e, "Failed to send message due to too many request, would attempt to resend " +
+                                   $"after {delay}, topic={message.Topic}, maxAttempts={maxAttempts}, attempt={attempt}, " +
+                                   $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+                }
+                finally
+                {
+                    var elapsed = stopwatch.Elapsed.Milliseconds;
+                    _sendCostTimeHistogram.Record(elapsed,
+                        new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
+                        new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
+                        new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
+                            null == exception ? MetricConstant.Success : MetricConstant.Failure));
+                }
             }
+
+            throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
+                exception);
         }
 
         internal override Settings GetSettings()
diff --git a/diff_sec.zip b/diff_sec.zip
new file mode 100644
index 00000000..89278e56
Binary files /dev/null and b/diff_sec.zip differ