You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:59 UTC

[rocketmq-clients] 19/28: Polish code

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit d70721197113c66f063acd40efa64395569236d4
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Feb 17 11:57:51 2023 +0800

    Polish code
---
 csharp/examples/ProducerBenchmark.cs          |  7 +--
 csharp/rocketmq-client-csharp/Client.cs       | 86 +++++++++++++++++----------
 csharp/rocketmq-client-csharp/MqLogManager.cs | 13 ++--
 csharp/rocketmq-client-csharp/Session.cs      |  9 ++-
 csharp/rocketmq-client-csharp/State.cs        | 24 ++++++++
 5 files changed, 90 insertions(+), 49 deletions(-)

diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
index 3918666d..6f94028e 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -68,14 +68,14 @@ namespace examples
                 Keys = keys
             };
 
-            const int tpsLimit = 500;
+            const int tpsLimit = 1;
 
             Task.Run(async () =>
             {
                 while (true)
                 {
                     _semaphore.Release(tpsLimit);
-                    await Task.Delay(TimeSpan.FromMilliseconds(1000));
+                    await Task.Delay(TimeSpan.FromSeconds(1));
                 }
             });
 
@@ -83,8 +83,7 @@ namespace examples
             {
                 while (true)
                 {
-                    Logger.Info($"Send {_counter} messages successfully.");
-                    Interlocked.Exchange(ref _counter, 0);
+                    Logger.Info($"Send {Interlocked.Exchange(ref _counter, 0)} messages successfully.");
                     await Task.Delay(TimeSpan.FromSeconds(1));
                 }
             });
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 9311b48a..5a2cd69f 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -40,11 +40,11 @@ namespace Org.Apache.Rocketmq
         private readonly CancellationTokenSource _topicRouteUpdateCts;
 
         private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1);
-        private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5);
+        private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromSeconds(1);
         private readonly CancellationTokenSource _settingsSyncCts;
         
-        private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(1);
-        private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(1);
+        private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(60);
+        private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(60);
         private readonly CancellationTokenSource _statsCts;
 
         protected readonly ClientConfig ClientConfig;
@@ -201,6 +201,26 @@ namespace Org.Apache.Rocketmq
             try
             {
                 Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}");
+                Dictionary<string, Task<TopicRouteData>> responses = new();
+                
+                foreach (var topic in GetTopics())
+                {
+                    var task = FetchTopicRoute(topic);
+                    responses[topic] = task;
+                }
+                
+                foreach (var item in responses.Keys)
+                {
+                    try
+                    {
+                        await responses[item];
+                    }
+                    catch (Exception e)
+                    {
+                        Logger.Error(e, $"Failed to update topic route cache, topic={item}");
+                    }
+                }
+                
                 foreach (var topic in GetTopics())
                 {
                     await FetchTopicRoute(topic);
@@ -208,8 +228,8 @@ namespace Org.Apache.Rocketmq
             }
             catch (Exception e)
             {
-                Logger.Error(e,
-                    $"[Bug] unexpected exception raised during topic route cache update, clientId={ClientId}");
+                Logger.Error(e, $"[Bug] unexpected exception raised during topic route cache update, " +
+                                $"clientId={ClientId}");
             }
         }
 
@@ -218,9 +238,12 @@ namespace Org.Apache.Rocketmq
             try
             {
                 var totalRouteEndpoints = GetTotalRouteEndpoints();
-                foreach (var (_, session) in totalRouteEndpoints.Select(GetSession))
+                foreach (var endpoints in totalRouteEndpoints)
                 {
+                    var (_, session) = GetSession(endpoints);
                     await session.SyncSettings(false);
+                    Logger.Info($"Sync settings to remote, endpoints={endpoints}");
+
                 }
             }
             catch (Exception e)
@@ -319,38 +342,37 @@ namespace Org.Apache.Rocketmq
                 // Collect task into a map.
                 foreach (var item in endpoints)
                 {
-                    try
-                    {
-                        var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
-                        responses[item] = task;
-                    }
-                    catch (Exception e)
-                    {
-                        Logger.Error(e, $"Failed to send heartbeat, endpoints={item}");
-                    }
+                    var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
+                    responses[item] = task;
                 }
 
-
                 foreach (var item in responses.Keys)
                 {
-                    var response = await responses[item];
-                    var code = response.Status.Code;
-
-                    if (code.Equals(Proto.Code.Ok))
+                    try
                     {
-                        Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}");
-                        if (Isolated.TryRemove(item, out _))
+                        var response = await responses[item];
+                        var code = response.Status.Code;
+
+                        if (code.Equals(Proto.Code.Ok))
                         {
-                            Logger.Info(
-                                $"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}");
+                            Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}");
+                            if (Isolated.TryRemove(item, out _))
+                            {
+                                Logger.Info($"Rejoin endpoints which was isolated before, endpoints={item}, " +
+                                            $"clientId={ClientId}");
+                            }
+
+                            return;
                         }
 
-                        return;
+                        var statusMessage = response.Status.Message;
+                        Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, " +
+                                    $"statusMessage={statusMessage}, clientId={ClientId}");
+                    }
+                    catch (Exception e)
+                    {
+                        Logger.Error(e, $"Failed to send heartbeat, endpoints={item}");
                     }
-
-                    var statusMessage = response.Status.Message;
-                    Logger.Info(
-                        $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
                 }
             }
             catch (Exception e)
@@ -421,7 +443,7 @@ namespace Org.Apache.Rocketmq
                 Status = status
             };
             var (_, session) = GetSession(endpoints);
-            await session.write(telemetryCommand);
+            await session.WriteAsync(telemetryCommand);
         }
 
         public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
@@ -439,7 +461,7 @@ namespace Org.Apache.Rocketmq
                 Status = status
             };
             var (_, session) = GetSession(endpoints);
-            await session.write(telemetryCommand);
+            await session.WriteAsync(telemetryCommand);
         }
 
         public async void OnPrintThreadStackTraceCommand(Endpoints endpoints,
@@ -457,7 +479,7 @@ namespace Org.Apache.Rocketmq
                 Status = status
             };
             var (_, session) = GetSession(endpoints);
-            await session.write(telemetryCommand);
+            await session.WriteAsync(telemetryCommand);
         }
 
         public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
diff --git a/csharp/rocketmq-client-csharp/MqLogManager.cs b/csharp/rocketmq-client-csharp/MqLogManager.cs
index 7fa2b7bf..6b117ea5 100644
--- a/csharp/rocketmq-client-csharp/MqLogManager.cs
+++ b/csharp/rocketmq-client-csharp/MqLogManager.cs
@@ -28,22 +28,19 @@ namespace Org.Apache.Rocketmq
      *
      * Configure component logging, please refer to https://github.com/NLog/NLog/wiki/Configure-component-logging
      */
-    public class MqLogManager
+    public static class MqLogManager
     {
-        public static LogFactory Instance
-        {
-            get { return LazyInstance.Value; }
-        }
+        public static LogFactory Instance => LazyInstance.Value;
 
         private static readonly Lazy<LogFactory> LazyInstance = new(BuildLogFactory);
 
         private static LogFactory BuildLogFactory()
         {
             // Use name of current assembly to construct NLog config filename 
-            Assembly thisAssembly = Assembly.GetExecutingAssembly();
-            string configFilePath = Path.ChangeExtension(thisAssembly.Location, ".nlog");
+            var thisAssembly = Assembly.GetExecutingAssembly();
+            var configFilePath = Path.ChangeExtension(thisAssembly.Location, ".nlog");
 
-            LogFactory logFactory = new LogFactory();
+            var logFactory = new LogFactory();
             logFactory.Configuration = new XmlLoggingConfiguration(configFilePath, logFactory);
             return logFactory;
         }
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index 0d35be0a..92465291 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq
 
         private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3);
 
-        private readonly grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand>
+        private readonly AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand>
             _streamingCall;
 
         private readonly IClient _client;
@@ -56,26 +56,25 @@ namespace Org.Apache.Rocketmq
             Loop();
         }
 
-        public async Task write(Proto.TelemetryCommand telemetryCommand)
+        public async Task WriteAsync(Proto.TelemetryCommand telemetryCommand)
         {
             var writer = _streamingCall.RequestStream;
             await writer.WriteAsync(telemetryCommand);
         }
 
+        // TODO: Test concurrency.
         public async Task SyncSettings(bool awaitResp)
         {
             // Add more buffer time.
             await _semaphore.WaitAsync(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout));
             try
             {
-                var writer = _streamingCall.RequestStream;
-                // await readTask;
                 var settings = _client.GetSettings();
                 var telemetryCommand = new Proto.TelemetryCommand
                 {
                     Settings = settings.ToProtobuf()
                 };
-                await writer.WriteAsync(telemetryCommand);
+                await WriteAsync(telemetryCommand);
                 // await writer.CompleteAsync();
                 if (awaitResp)
                 {
diff --git a/csharp/rocketmq-client-csharp/State.cs b/csharp/rocketmq-client-csharp/State.cs
new file mode 100644
index 00000000..1dbd6b30
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/State.cs
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Org.Apache.Rocketmq
+{
+    public enum State
+    {
+        
+    }
+}
\ No newline at end of file