You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:59 UTC
[rocketmq-clients] 19/28: Polish code
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit d70721197113c66f063acd40efa64395569236d4
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Feb 17 11:57:51 2023 +0800
Polish code
---
csharp/examples/ProducerBenchmark.cs | 7 +--
csharp/rocketmq-client-csharp/Client.cs | 86 +++++++++++++++++----------
csharp/rocketmq-client-csharp/MqLogManager.cs | 13 ++--
csharp/rocketmq-client-csharp/Session.cs | 9 ++-
csharp/rocketmq-client-csharp/State.cs | 24 ++++++++
5 files changed, 90 insertions(+), 49 deletions(-)
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
index 3918666d..6f94028e 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -68,14 +68,14 @@ namespace examples
Keys = keys
};
- const int tpsLimit = 500;
+ const int tpsLimit = 1;
Task.Run(async () =>
{
while (true)
{
_semaphore.Release(tpsLimit);
- await Task.Delay(TimeSpan.FromMilliseconds(1000));
+ await Task.Delay(TimeSpan.FromSeconds(1));
}
});
@@ -83,8 +83,7 @@ namespace examples
{
while (true)
{
- Logger.Info($"Send {_counter} messages successfully.");
- Interlocked.Exchange(ref _counter, 0);
+ Logger.Info($"Send {Interlocked.Exchange(ref _counter, 0)} messages successfully.");
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 9311b48a..5a2cd69f 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -40,11 +40,11 @@ namespace Org.Apache.Rocketmq
private readonly CancellationTokenSource _topicRouteUpdateCts;
private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1);
- private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5);
+ private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromSeconds(1);
private readonly CancellationTokenSource _settingsSyncCts;
- private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(1);
- private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(1);
+ private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(60);
+ private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(60);
private readonly CancellationTokenSource _statsCts;
protected readonly ClientConfig ClientConfig;
@@ -201,6 +201,26 @@ namespace Org.Apache.Rocketmq
try
{
Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}");
+ Dictionary<string, Task<TopicRouteData>> responses = new();
+
+ foreach (var topic in GetTopics())
+ {
+ var task = FetchTopicRoute(topic);
+ responses[topic] = task;
+ }
+
+ foreach (var item in responses.Keys)
+ {
+ try
+ {
+ await responses[item];
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, $"Failed to update topic route cache, topic={item}");
+ }
+ }
+
foreach (var topic in GetTopics())
{
await FetchTopicRoute(topic);
@@ -208,8 +228,8 @@ namespace Org.Apache.Rocketmq
}
catch (Exception e)
{
- Logger.Error(e,
- $"[Bug] unexpected exception raised during topic route cache update, clientId={ClientId}");
+ Logger.Error(e, $"[Bug] unexpected exception raised during topic route cache update, " +
+ $"clientId={ClientId}");
}
}
@@ -218,9 +238,12 @@ namespace Org.Apache.Rocketmq
try
{
var totalRouteEndpoints = GetTotalRouteEndpoints();
- foreach (var (_, session) in totalRouteEndpoints.Select(GetSession))
+ foreach (var endpoints in totalRouteEndpoints)
{
+ var (_, session) = GetSession(endpoints);
await session.SyncSettings(false);
+ Logger.Info($"Sync settings to remote, endpoints={endpoints}");
+
}
}
catch (Exception e)
@@ -319,38 +342,37 @@ namespace Org.Apache.Rocketmq
// Collect task into a map.
foreach (var item in endpoints)
{
- try
- {
- var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
- responses[item] = task;
- }
- catch (Exception e)
- {
- Logger.Error(e, $"Failed to send heartbeat, endpoints={item}");
- }
+ var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
+ responses[item] = task;
}
-
foreach (var item in responses.Keys)
{
- var response = await responses[item];
- var code = response.Status.Code;
-
- if (code.Equals(Proto.Code.Ok))
+ try
{
- Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}");
- if (Isolated.TryRemove(item, out _))
+ var response = await responses[item];
+ var code = response.Status.Code;
+
+ if (code.Equals(Proto.Code.Ok))
{
- Logger.Info(
- $"Rejoin endpoints which was isolated before, endpoints={item}, clientId={ClientId}");
+ Logger.Info($"Send heartbeat successfully, endpoints={item}, clientId={ClientId}");
+ if (Isolated.TryRemove(item, out _))
+ {
+ Logger.Info($"Rejoin endpoints which was isolated before, endpoints={item}, " +
+ $"clientId={ClientId}");
+ }
+
+ return;
}
- return;
+ var statusMessage = response.Status.Message;
+ Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, " +
+ $"statusMessage={statusMessage}, clientId={ClientId}");
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, $"Failed to send heartbeat, endpoints={item}");
}
-
- var statusMessage = response.Status.Message;
- Logger.Info(
- $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
}
}
catch (Exception e)
@@ -421,7 +443,7 @@ namespace Org.Apache.Rocketmq
Status = status
};
var (_, session) = GetSession(endpoints);
- await session.write(telemetryCommand);
+ await session.WriteAsync(telemetryCommand);
}
public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
@@ -439,7 +461,7 @@ namespace Org.Apache.Rocketmq
Status = status
};
var (_, session) = GetSession(endpoints);
- await session.write(telemetryCommand);
+ await session.WriteAsync(telemetryCommand);
}
public async void OnPrintThreadStackTraceCommand(Endpoints endpoints,
@@ -457,7 +479,7 @@ namespace Org.Apache.Rocketmq
Status = status
};
var (_, session) = GetSession(endpoints);
- await session.write(telemetryCommand);
+ await session.WriteAsync(telemetryCommand);
}
public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
diff --git a/csharp/rocketmq-client-csharp/MqLogManager.cs b/csharp/rocketmq-client-csharp/MqLogManager.cs
index 7fa2b7bf..6b117ea5 100644
--- a/csharp/rocketmq-client-csharp/MqLogManager.cs
+++ b/csharp/rocketmq-client-csharp/MqLogManager.cs
@@ -28,22 +28,19 @@ namespace Org.Apache.Rocketmq
*
* Configure component logging, please refer to https://github.com/NLog/NLog/wiki/Configure-component-logging
*/
- public class MqLogManager
+ public static class MqLogManager
{
- public static LogFactory Instance
- {
- get { return LazyInstance.Value; }
- }
+ public static LogFactory Instance => LazyInstance.Value;
private static readonly Lazy<LogFactory> LazyInstance = new(BuildLogFactory);
private static LogFactory BuildLogFactory()
{
// Use name of current assembly to construct NLog config filename
- Assembly thisAssembly = Assembly.GetExecutingAssembly();
- string configFilePath = Path.ChangeExtension(thisAssembly.Location, ".nlog");
+ var thisAssembly = Assembly.GetExecutingAssembly();
+ var configFilePath = Path.ChangeExtension(thisAssembly.Location, ".nlog");
- LogFactory logFactory = new LogFactory();
+ var logFactory = new LogFactory();
logFactory.Configuration = new XmlLoggingConfiguration(configFilePath, logFactory);
return logFactory;
}
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index 0d35be0a..92465291 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq
private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3);
- private readonly grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand>
+ private readonly AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand>
_streamingCall;
private readonly IClient _client;
@@ -56,26 +56,25 @@ namespace Org.Apache.Rocketmq
Loop();
}
- public async Task write(Proto.TelemetryCommand telemetryCommand)
+ public async Task WriteAsync(Proto.TelemetryCommand telemetryCommand)
{
var writer = _streamingCall.RequestStream;
await writer.WriteAsync(telemetryCommand);
}
+ // TODO: Test concurrency.
public async Task SyncSettings(bool awaitResp)
{
// Add more buffer time.
await _semaphore.WaitAsync(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout));
try
{
- var writer = _streamingCall.RequestStream;
- // await readTask;
var settings = _client.GetSettings();
var telemetryCommand = new Proto.TelemetryCommand
{
Settings = settings.ToProtobuf()
};
- await writer.WriteAsync(telemetryCommand);
+ await WriteAsync(telemetryCommand);
// await writer.CompleteAsync();
if (awaitResp)
{
diff --git a/csharp/rocketmq-client-csharp/State.cs b/csharp/rocketmq-client-csharp/State.cs
new file mode 100644
index 00000000..1dbd6b30
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/State.cs
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Org.Apache.Rocketmq
+{
+ public enum State
+ {
+
+ }
+}
\ No newline at end of file