You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/15 04:08:17 UTC
[rocketmq-clients] 03/03: Log TooManyRequest
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 513de8f3eaa802f445f10928312614016f866b3b
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Mar 15 11:50:43 2023 +0800
Log TooManyRequest
---
csharp/rocketmq-client-csharp/Producer.cs | 149 ++++++++++++++++--------------
diff_sec.zip | Bin 0 -> 174 bytes
2 files changed, 79 insertions(+), 70 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index da7e8d03..392c50d4 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -168,39 +168,8 @@ namespace Org.Apache.Rocketmq
? publishingLoadBalancer.TakeMessageQueues(new HashSet<Endpoints>(Isolated.Keys), maxAttempts)
: new List<MessageQueue>
{ publishingLoadBalancer.TakeMessageQueueByMessageGroup(publishingMessage.MessageGroup) };
- Exception exception = null;
- for (var attempt = 1; attempt <= maxAttempts; attempt++)
- {
- var stopwatch = Stopwatch.StartNew();
- try
- {
- var sendReceipt = await Send0(publishingMessage, candidates, attempt, maxAttempts);
- return sendReceipt;
- }
- catch (Exception e)
- {
- exception = e;
- }
- finally
- {
- var elapsed = stopwatch.Elapsed.Milliseconds;
- _sendCostTimeHistogram.Record(elapsed,
- new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
- new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
- new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
- null == exception ? MetricConstant.Success : MetricConstant.Failure));
- // Retry immediately if the request is not throttled.
- if (exception is TooManyRequestsException)
- {
- var nextAttempt = 1 + attempt;
- var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
- await Task.Delay(delay);
- }
- }
- }
-
- throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
- exception);
+ var sendReceipt = await Send0(publishingMessage, candidates);
+ return sendReceipt;
}
public async Task<ISendReceipt> Send(Message message)
@@ -236,53 +205,93 @@ namespace Org.Apache.Rocketmq
};
}
- private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates, int attempt,
- int maxAttempts)
+ private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates)
{
- var candidateIndex = (attempt - 1) % candidates.Count;
- var mq = candidates[candidateIndex];
- if (PublishingSettings.IsValidateMessageType() &&
- !mq.AcceptMessageTypes.Contains(message.MessageType))
- {
- throw new ArgumentException("Current message type does not match with the accept message types," +
- $" topic={message.Topic}, actualMessageType={message.MessageType}" +
- $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
- }
-
- var sendMessageRequest = WrapSendMessageRequest(message, mq);
- var endpoints = mq.Broker.Endpoints;
- var invocation = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
- try
+ var retryPolicy = GetRetryPolicy();
+ var maxAttempts = retryPolicy.GetMaxAttempts();
+ Exception exception = null;
+ for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
- var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
+ var stopwatch = Stopwatch.StartNew();
- var sendReceipt = sendReceipts.First();
- if (attempt > 1)
+ var candidateIndex = (attempt - 1) % candidates.Count;
+ var mq = candidates[candidateIndex];
+ if (PublishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType))
{
- Logger.Info(
- $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
- $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+ throw new ArgumentException(
+ "Current message type does not match with the accept message types," +
+ $" topic={message.Topic}, actualMessageType={message.MessageType}" +
+ $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
}
- return sendReceipt;
- }
- catch (Exception e)
- {
- // Isolate current endpoints.
- Isolated[endpoints] = true;
- if (attempt >= maxAttempts)
+ var sendMessageRequest = WrapSendMessageRequest(message, mq);
+ var endpoints = mq.Broker.Endpoints;
+ try
{
- Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
- $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
- $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
- throw;
+ var invocation =
+ await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
+ var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
+
+ var sendReceipt = sendReceipts.First();
+ if (attempt > 1)
+ {
+ Logger.Info(
+ $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
+ $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+ }
+
+ return sendReceipt;
}
+ catch (Exception e)
+ {
+ // Isolate current endpoints.
+ Isolated[endpoints] = true;
+ if (attempt >= maxAttempts)
+ {
+ Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
+ $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
+ $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+ throw;
+ }
- Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
- $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
- $" clientId={ClientId}");
- throw;
+ if (MessageType.Transaction == message.MessageType)
+ {
+ Logger.Error(e, "Failed to send transaction message, run out of attempt times, " +
+ $"topic={message.Topic}, maxAttempt=1, attempt={attempt}, " +
+ $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+ throw;
+ }
+
+ exception = e;
+ if (exception is not TooManyRequestsException)
+ {
+ // Retry immediately if the request is not throttled.
+ Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
+ $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
+ $" clientId={ClientId}");
+ continue;
+ }
+
+ var nextAttempt = 1 + attempt;
+ var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
+ await Task.Delay(delay);
+ Logger.Warn(e, "Failed to send message due to too many request, would attempt to resend " +
+ $"after {delay}, topic={message.Topic}, maxAttempts={maxAttempts}, attempt={attempt}, " +
+ $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+ }
+ finally
+ {
+ var elapsed = stopwatch.Elapsed.Milliseconds;
+ _sendCostTimeHistogram.Record(elapsed,
+ new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
+ new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
+ new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
+ null == exception ? MetricConstant.Success : MetricConstant.Failure));
+ }
}
+
+ throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
+ exception);
}
internal override Settings GetSettings()
diff --git a/diff_sec.zip b/diff_sec.zip
new file mode 100644
index 00000000..89278e56
Binary files /dev/null and b/diff_sec.zip differ