You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/06 04:54:45 UTC

[rocketmq-clients] 04/05: Adjust settings sync frequency

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit c8af7a4c41fd000a129b5bf4311f8cd297677c18
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Mar 6 11:30:14 2023 +0800

    Adjust settings sync frequency
---
 csharp/rocketmq-client-csharp/Client.cs  |  7 +------
 csharp/rocketmq-client-csharp/Session.cs | 13 ++++---------
 2 files changed, 5 insertions(+), 15 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index e78a8651..b3a38dc6 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -40,7 +40,7 @@ namespace Org.Apache.Rocketmq
         private readonly CancellationTokenSource _topicRouteUpdateCts;
 
         private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1);
-        private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromSeconds(1);
+        private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5);
         private readonly CancellationTokenSource _settingsSyncCts;
 
         private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(60);
@@ -227,11 +227,6 @@ namespace Org.Apache.Rocketmq
                         Logger.Error(e, $"Failed to update topic route cache, topic={item}");
                     }
                 }
-
-                foreach (var topic in GetTopics())
-                {
-                    await FetchTopicRoute(topic);
-                }
             }
             catch (Exception e)
             {
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index c6e98418..86316b9d 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -17,7 +17,6 @@
 
 using System;
 using System.Threading;
-using System.Threading.Channels;
 using System.Threading.Tasks;
 using Grpc.Core;
 using grpc = Grpc.Core;
@@ -32,12 +31,12 @@ namespace Org.Apache.Rocketmq
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
         private static readonly TimeSpan SettingsInitializationTimeout = TimeSpan.FromSeconds(3);
+        private readonly ManualResetEventSlim _event = new(false);
 
         private readonly AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand>
             _streamingCall;
 
         private readonly Client _client;
-        private readonly Channel<bool> _channel;
         private readonly Endpoints _endpoints;
         private readonly SemaphoreSlim _semaphore;
 
@@ -49,10 +48,6 @@ namespace Org.Apache.Rocketmq
             _semaphore = new SemaphoreSlim(1);
             _streamingCall = streamingCall;
             _client = client;
-            _channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(1)
-            {
-                FullMode = BoundedChannelFullMode.DropOldest
-            });
             Loop();
         }
 
@@ -66,7 +61,7 @@ namespace Org.Apache.Rocketmq
         public async Task SyncSettings(bool awaitResp)
         {
             // Add more buffer time.
-            await _semaphore.WaitAsync(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout));
+            await _semaphore.WaitAsync();
             try
             {
                 var settings = _client.GetSettings();
@@ -78,7 +73,7 @@ namespace Org.Apache.Rocketmq
                 // await writer.CompleteAsync();
                 if (awaitResp)
                 {
-                    await _channel.Reader.ReadAsync();
+                    _event.Wait(_client.GetClientConfig().RequestTimeout.Add(SettingsInitializationTimeout));
                 }
             }
             finally
@@ -100,7 +95,7 @@ namespace Org.Apache.Rocketmq
                             Logger.Info(
                                 $"Receive setting from remote, endpoints={_endpoints}, clientId={_client.GetClientId()}");
                             _client.OnSettingsCommand(_endpoints, response.Settings);
-                            await _channel.Writer.WriteAsync(true);
+                            _event.Set();
                             break;
                         }
                         case Proto.TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand: