You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/06 04:54:45 UTC
[rocketmq-clients] 04/05: Adjust settings sync frequency
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit c8af7a4c41fd000a129b5bf4311f8cd297677c18
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Mar 6 11:30:14 2023 +0800
Adjust settings sync frequency
---
csharp/rocketmq-client-csharp/Client.cs | 7 +------
csharp/rocketmq-client-csharp/Session.cs | 13 ++++---------
2 files changed, 5 insertions(+), 15 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index e78a8651..b3a38dc6 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -40,7 +40,7 @@ namespace Org.Apache.Rocketmq
private readonly CancellationTokenSource _topicRouteUpdateCts;
private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1);
- private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromSeconds(1);
+ private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5);
private readonly CancellationTokenSource _settingsSyncCts;
private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(60);
@@ -227,11 +227,6 @@ namespace Org.Apache.Rocketmq
Logger.Error(e, $"Failed to update topic route cache, topic={item}");
}
}
-
- foreach (var topic in GetTopics())
- {
- await FetchTopicRoute(topic);
- }
}
catch (Exception e)
{
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index c6e98418..86316b9d 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -17,7 +17,6 @@
using System;
using System.Threading;
-using System.Threading.Channels;
using System.Threading.Tasks;
using Grpc.Core;
using grpc = Grpc.Core;
@@ -32,12 +31,12 @@ namespace Org.Apache.Rocketmq
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3);
+ private readonly ManualResetEventSlim _event = new(false);
private readonly AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand>
_streamingCall;
private readonly Client _client;
- private readonly Channel<bool> _channel;
private readonly Endpoints _endpoints;
private readonly SemaphoreSlim _semaphore;
@@ -49,10 +48,6 @@ namespace Org.Apache.Rocketmq
_semaphore = new SemaphoreSlim(1);
_streamingCall = streamingCall;
_client = client;
- _channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(1)
- {
- FullMode = BoundedChannelFullMode.DropOldest
- });
Loop();
}
@@ -66,7 +61,7 @@ namespace Org.Apache.Rocketmq
public async Task SyncSettings(bool awaitResp)
{
// Add more buffer time.
- await _semaphore.WaitAsync(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout));
+ await _semaphore.WaitAsync();
try
{
var settings = _client.GetSettings();
@@ -78,7 +73,7 @@ namespace Org.Apache.Rocketmq
// await writer.CompleteAsync();
if (awaitResp)
{
- await _channel.Reader.ReadAsync();
+ _event.Wait(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout));
}
}
finally
@@ -100,7 +95,7 @@ namespace Org.Apache.Rocketmq
Logger.Info(
$"Receive setting from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}");
_client.OnSettingsCommand(_endpoints, response.Settings);
- await _channel.Writer.WriteAsync(true);
+ _event.Set();
break;
}
case Proto.TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand: