You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:44 UTC

[rocketmq-clients] 04/28: Add more logs

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 8417c0a9beaf055b17073c79e7db7008a9c87b23
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Feb 7 19:03:30 2023 +0800

    Add more logs
---
 csharp/examples/ProducerNormalMessageExample.cs    |  4 ++-
 csharp/rocketmq-client-csharp/Client.cs            | 38 ++++++++++++++++++----
 .../ClientLoggerInterceptor.cs                     |  4 +--
 csharp/rocketmq-client-csharp/Endpoints.cs         |  7 +++-
 csharp/rocketmq-client-csharp/Producer.cs          | 15 ++++-----
 csharp/rocketmq-client-csharp/TopicRouteData.cs    | 26 ++++++++++-----
 6 files changed, 67 insertions(+), 27 deletions(-)

diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index c22bc695..1f80671c 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -17,6 +17,7 @@
 
 using System.Collections.Generic;
 using System.Text;
+using System.Threading;
 using System.Threading.Tasks;
 using NLog;
 using Org.Apache.Rocketmq;
@@ -64,8 +65,9 @@ namespace examples
             };
             var sendReceipt = await producer.Send(message);
             Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
+            Thread.Sleep(9999999);
             // Close the producer if you don't need it anymore.
-            await producer.Shutdown();
+            // await producer.Shutdown();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index c4392e2d..a1c4b821 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -181,6 +181,7 @@ namespace Org.Apache.Rocketmq
 
         private async void UpdateTopicRouteCache()
         {
+            Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}");
             foreach (var topic in Topics.Keys)
             {
                 var topicRouteData = await FetchTopicRoute(topic);
@@ -230,9 +231,17 @@ namespace Org.Apache.Rocketmq
                 Endpoints = ClientConfig.Endpoints.ToProtobuf()
             };
 
-            var queryRouteResponse =
+            var response =
                 await Manager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout);
-            var messageQueues = queryRouteResponse.MessageQueues.ToList();
+            var code = response.Status.Code;
+            if (!Proto.Code.Ok.Equals(code))
+            {
+                Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " +
+                             $"statusMessage={response.Status.Message}");
+            }
+            StatusChecker.Check(response.Status, request);
+
+            var messageQueues = response.MessageQueues.ToList();
             return new TopicRouteData(messageQueues);
         }
 
@@ -246,11 +255,26 @@ namespace Org.Apache.Rocketmq
             }
 
             var request = WrapHeartbeatRequest();
-
-            var tasks = endpoints.Select(endpoint => Manager.Heartbeat(endpoint, request, ClientConfig.RequestTimeout))
-                .Cast<Task>().ToList();
-
-            await Task.WhenAll(tasks);
+            Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new ();
+            // Collect task into a map.
+            foreach (var item in endpoints)
+            {
+                var task = Manager.Heartbeat(item, request, ClientConfig.RequestTimeout);
+                responses[item]= task;
+            }
+            foreach (var item in responses.Keys)
+            {
+                var response = await responses[item];
+                var code = response.Status.Code;
+                
+                if (code.Equals(Proto.Code.Ok))
+                {
+                    Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}");
+                    return;
+                }
+                var statusMessage = response.Status.Message;
+                Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
+            }
         }
 
 
diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
index 01adddc8..d9622291 100644
--- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
+++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -56,7 +56,7 @@ namespace Org.Apache.Rocketmq
             try
             {
                 var response = await t;
-                Logger.Debug($"Response received: {response}");
+                Logger.Trace($"Response received: {response}");
                 return response;
             }
             catch (Exception ex)
@@ -101,7 +101,7 @@ namespace Org.Apache.Rocketmq
             where TRequest : class
             where TResponse : class
         {
-            Logger.Debug($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
+            Logger.Trace($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
         }
 
         private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
diff --git a/csharp/rocketmq-client-csharp/Endpoints.cs b/csharp/rocketmq-client-csharp/Endpoints.cs
index 3228881c..e7cf5f9c 100644
--- a/csharp/rocketmq-client-csharp/Endpoints.cs
+++ b/csharp/rocketmq-client-csharp/Endpoints.cs
@@ -67,6 +67,11 @@ namespace Org.Apache.Rocketmq
             Addresses = addresses;
         }
 
+        public override string ToString()
+        {
+            return GrpcTarget;
+        }
+
         public string GrpcTarget
         {
             // TODO
@@ -82,7 +87,7 @@ namespace Org.Apache.Rocketmq
                 return "";
             }
         }
-
+        
         public bool Equals(Endpoints other)
         {
             if (ReferenceEquals(null, other))
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 9fa10ad3..56ec5b40 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -19,7 +19,12 @@ namespace Org.Apache.Rocketmq
         {
         }
 
-        public Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
+        public Producer(ClientConfig clientConfig, int maxAttempts) : this(clientConfig,
+            new ConcurrentDictionary<string, bool>(), maxAttempts)
+        {
+        }
+
+        private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
             base(clientConfig, topics)
         {
             var retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
@@ -72,12 +77,6 @@ namespace Org.Apache.Rocketmq
             if (!_publishingRouteDataCache.TryGetValue(message.Topic, out var publishingLoadBalancer))
             {
                 var topicRouteData = await FetchTopicRoute(message.Topic);
-                if (null == topicRouteData || null == topicRouteData.MessageQueues ||
-                    0 == topicRouteData.MessageQueues.Count)
-                {
-                    throw new TopicRouteException($"No topic route for {message.Topic}");
-                }
-
                 publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
                 _publishingRouteDataCache.TryAdd(message.Topic, publishingLoadBalancer);
             }
@@ -87,7 +86,7 @@ namespace Org.Apache.Rocketmq
             var maxAttempts = retryPolicy.getMaxAttempts();
             var candidates = publishingLoadBalancer.TakeMessageQueues(publishingMessage.MessageGroup, maxAttempts);
             Exception exception = null;
-            for (int attempt = 0; attempt < maxAttempts; attempt++)
+            for (var attempt = 0; attempt < maxAttempts; attempt++)
             {
                 try
                 {
diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs
index 05418eaa..2be2b9a5 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteData.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs
@@ -26,7 +26,7 @@ namespace Org.Apache.Rocketmq
     {
         public TopicRouteData(List<rmq::MessageQueue> messageQueues)
         {
-            List<MessageQueue> messageQueuesList = new List<MessageQueue>();
+            var messageQueuesList = new List<MessageQueue>();
             foreach (var mq in messageQueues)
             {
                 messageQueuesList.Add(new MessageQueue(mq));
@@ -40,17 +40,27 @@ namespace Org.Apache.Rocketmq
 
         public bool Equals(TopicRouteData other)
         {
-            if (ReferenceEquals(null, other)) return false;
-            if (ReferenceEquals(this, other)) return true;
-            return Equals(MessageQueues, other.MessageQueues);
+            if (ReferenceEquals(null, other))
+            {
+                return false;
+            }
+
+            return ReferenceEquals(this, other) || Equals(MessageQueues, other.MessageQueues);
         }
 
         public override bool Equals(object obj)
         {
-            if (ReferenceEquals(null, obj)) return false;
-            if (ReferenceEquals(this, obj)) return true;
-            if (obj.GetType() != this.GetType()) return false;
-            return Equals((TopicRouteData)obj);
+            if (ReferenceEquals(null, obj))
+            {
+                return false;
+            }
+
+            if (ReferenceEquals(this, obj))
+            {
+                return true;
+            }
+
+            return obj.GetType() == GetType() && Equals((TopicRouteData)obj);
         }
 
         public override int GetHashCode()