You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:44 UTC
[rocketmq-clients] 04/28: Add more logs
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 8417c0a9beaf055b17073c79e7db7008a9c87b23
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Feb 7 19:03:30 2023 +0800
Add more logs
---
csharp/examples/ProducerNormalMessageExample.cs | 4 ++-
csharp/rocketmq-client-csharp/Client.cs | 38 ++++++++++++++++++----
.../ClientLoggerInterceptor.cs | 4 +--
csharp/rocketmq-client-csharp/Endpoints.cs | 7 +++-
csharp/rocketmq-client-csharp/Producer.cs | 15 ++++-----
csharp/rocketmq-client-csharp/TopicRouteData.cs | 26 ++++++++++-----
6 files changed, 67 insertions(+), 27 deletions(-)
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index c22bc695..1f80671c 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -17,6 +17,7 @@
using System.Collections.Generic;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using NLog;
using Org.Apache.Rocketmq;
@@ -64,8 +65,9 @@ namespace examples
};
var sendReceipt = await producer.Send(message);
Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
+ Thread.Sleep(9999999);
// Close the producer if you don't need it anymore.
- await producer.Shutdown();
+ // await producer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index c4392e2d..a1c4b821 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -181,6 +181,7 @@ namespace Org.Apache.Rocketmq
private async void UpdateTopicRouteCache()
{
+ Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}");
foreach (var topic in Topics.Keys)
{
var topicRouteData = await FetchTopicRoute(topic);
@@ -230,9 +231,17 @@ namespace Org.Apache.Rocketmq
Endpoints = ClientConfig.Endpoints.ToProtobuf()
};
- var queryRouteResponse =
+ var response =
await Manager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout);
- var messageQueues = queryRouteResponse.MessageQueues.ToList();
+ var code = response.Status.Code;
+ if (!Proto.Code.Ok.Equals(code))
+ {
+ Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " +
+ $"statusMessage={response.Status.Message}");
+ }
+ StatusChecker.Check(response.Status, request);
+
+ var messageQueues = response.MessageQueues.ToList();
return new TopicRouteData(messageQueues);
}
@@ -246,11 +255,26 @@ namespace Org.Apache.Rocketmq
}
var request = WrapHeartbeatRequest();
-
- var tasks = endpoints.Select(endpoint => Manager.Heartbeat(endpoint, request, ClientConfig.RequestTimeout))
- .Cast<Task>().ToList();
-
- await Task.WhenAll(tasks);
+ Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new ();
+ // Collect task into a map.
+ foreach (var item in endpoints)
+ {
+ var task = Manager.Heartbeat(item, request, ClientConfig.RequestTimeout);
+ responses[item]= task;
+ }
+ foreach (var item in responses.Keys)
+ {
+ var response = await responses[item];
+ var code = response.Status.Code;
+
+ if (code.Equals(Proto.Code.Ok))
+ {
+ Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}");
+ return;
+ }
+ var statusMessage = response.Status.Message;
+ Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
+ }
}
diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
index 01adddc8..d9622291 100644
--- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
+++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -56,7 +56,7 @@ namespace Org.Apache.Rocketmq
try
{
var response = await t;
- Logger.Debug($"Response received: {response}");
+ Logger.Trace($"Response received: {response}");
return response;
}
catch (Exception ex)
@@ -101,7 +101,7 @@ namespace Org.Apache.Rocketmq
where TRequest : class
where TResponse : class
{
- Logger.Debug($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
+ Logger.Trace($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
}
private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
diff --git a/csharp/rocketmq-client-csharp/Endpoints.cs b/csharp/rocketmq-client-csharp/Endpoints.cs
index 3228881c..e7cf5f9c 100644
--- a/csharp/rocketmq-client-csharp/Endpoints.cs
+++ b/csharp/rocketmq-client-csharp/Endpoints.cs
@@ -67,6 +67,11 @@ namespace Org.Apache.Rocketmq
Addresses = addresses;
}
+ public override string ToString()
+ {
+ return GrpcTarget;
+ }
+
public string GrpcTarget
{
// TODO
@@ -82,7 +87,7 @@ namespace Org.Apache.Rocketmq
return "";
}
}
-
+
public bool Equals(Endpoints other)
{
if (ReferenceEquals(null, other))
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 9fa10ad3..56ec5b40 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -19,7 +19,12 @@ namespace Org.Apache.Rocketmq
{
}
- public Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
+ public Producer(ClientConfig clientConfig, int maxAttempts) : this(clientConfig,
+ new ConcurrentDictionary<string, bool>(), maxAttempts)
+ {
+ }
+
+ private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
base(clientConfig, topics)
{
var retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
@@ -72,12 +77,6 @@ namespace Org.Apache.Rocketmq
if (!_publishingRouteDataCache.TryGetValue(message.Topic, out var publishingLoadBalancer))
{
var topicRouteData = await FetchTopicRoute(message.Topic);
- if (null == topicRouteData || null == topicRouteData.MessageQueues ||
- 0 == topicRouteData.MessageQueues.Count)
- {
- throw new TopicRouteException($"No topic route for {message.Topic}");
- }
-
publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
_publishingRouteDataCache.TryAdd(message.Topic, publishingLoadBalancer);
}
@@ -87,7 +86,7 @@ namespace Org.Apache.Rocketmq
var maxAttempts = retryPolicy.getMaxAttempts();
var candidates = publishingLoadBalancer.TakeMessageQueues(publishingMessage.MessageGroup, maxAttempts);
Exception exception = null;
- for (int attempt = 0; attempt < maxAttempts; attempt++)
+ for (var attempt = 0; attempt < maxAttempts; attempt++)
{
try
{
diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs
index 05418eaa..2be2b9a5 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteData.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs
@@ -26,7 +26,7 @@ namespace Org.Apache.Rocketmq
{
public TopicRouteData(List<rmq::MessageQueue> messageQueues)
{
- List<MessageQueue> messageQueuesList = new List<MessageQueue>();
+ var messageQueuesList = new List<MessageQueue>();
foreach (var mq in messageQueues)
{
messageQueuesList.Add(new MessageQueue(mq));
@@ -40,17 +40,27 @@ namespace Org.Apache.Rocketmq
public bool Equals(TopicRouteData other)
{
- if (ReferenceEquals(null, other)) return false;
- if (ReferenceEquals(this, other)) return true;
- return Equals(MessageQueues, other.MessageQueues);
+ if (ReferenceEquals(null, other))
+ {
+ return false;
+ }
+
+ return ReferenceEquals(this, other) || Equals(MessageQueues, other.MessageQueues);
}
public override bool Equals(object obj)
{
- if (ReferenceEquals(null, obj)) return false;
- if (ReferenceEquals(this, obj)) return true;
- if (obj.GetType() != this.GetType()) return false;
- return Equals((TopicRouteData)obj);
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ return obj.GetType() == GetType() && Equals((TopicRouteData)obj);
}
public override int GetHashCode()