You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/01 09:10:11 UTC

[rocketmq-clients] branch csharp_dev updated: WIP: bugfix

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/csharp_dev by this push:
     new ff2ff51  WIP: bugfix
ff2ff51 is described below

commit ff2ff51f25561acd72a7b1b2f43752abf8aefbfb
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Thu Sep 1 17:10:01 2022 +0800

    WIP: bugfix
---
 csharp/examples/Program.cs                      |  46 ++++----
 csharp/rocketmq-client-csharp/AccessPoint.cs    |  28 ++---
 csharp/rocketmq-client-csharp/Client.cs         |  83 ++++++++------
 csharp/rocketmq-client-csharp/ClientConfig.cs   |  24 +----
 csharp/rocketmq-client-csharp/IClientConfig.cs  |   4 -
 csharp/rocketmq-client-csharp/Producer.cs       |   4 +-
 csharp/rocketmq-client-csharp/RpcClient.cs      |   2 +-
 csharp/rocketmq-client-csharp/Session.cs        |  13 ++-
 csharp/rocketmq-client-csharp/Signature.cs      |  10 +-
 csharp/rocketmq-client-csharp/SimpleConsumer.cs |  40 +++----
 csharp/tests/ClientManagerTest.cs               |   2 +-
 csharp/tests/ProducerTest.cs                    | 138 ++++++++++++------------
 csharp/tests/RpcClientTest.cs                   |   6 +-
 csharp/tests/SignatureTest.cs                   |  11 +-
 14 files changed, 202 insertions(+), 209 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index c15c6f1..f3c6027 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -123,36 +123,40 @@ namespace examples
         static async Task Main(string[] args)
         {
             var credentialsProvider = new ConfigFileCredentialsProvider();
-            var producer = new Producer(ACCESS_URL)
+            // var producer = new Producer(ACCESS_URL)
+            // {
+            //     CredentialsProvider = credentialsProvider
+            // };
+            // producer.AddTopicOfInterest(STANDARD_TOPIC);
+            // producer.AddTopicOfInterest(FIFO_TOPIC);
+            // producer.AddTopicOfInterest(TIMED_TOPIC);
+            // producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
+            //
+            // await producer.Start();
+            //
+            // var sendReceiptOfStandardMessage = await SendStandardMessage(producer);
+            // Console.WriteLine($"Standard message-id: {sendReceiptOfStandardMessage.MessageId}");
+            //
+            // var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
+            // Console.WriteLine($"FIFO message-id: {sendReceiptOfFifoMessage.MessageId}");
+            //
+            // var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
+            // Console.WriteLine($"Timed message-id: {sendReceiptOfTimedMessage.MessageId}");
+            //
+            // await producer.Shutdown();
+
+            Console.WriteLine("Now start a simple consumer");
+            var simpleConsumer = new SimpleConsumer(ACCESS_URL, CONCURRENT_GROUP)
             {
                 CredentialsProvider = credentialsProvider
             };
-            producer.AddTopicOfInterest(STANDARD_TOPIC);
-            producer.AddTopicOfInterest(FIFO_TOPIC);
-            producer.AddTopicOfInterest(TIMED_TOPIC);
-            producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
-
-            await producer.Start();
-
-            var sendReceiptOfStandardMessage = await SendStandardMessage(producer);
-            Console.WriteLine($"Standard message-id: {sendReceiptOfStandardMessage.MessageId}");
-
-            var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
-            Console.WriteLine($"FIFO message-id: {sendReceiptOfFifoMessage.MessageId}");
             
-            var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
-            Console.WriteLine($"Timed message-id: {sendReceiptOfTimedMessage.MessageId}");
-
-            await producer.Shutdown();
-
-            Console.WriteLine("Now start a simple consumer");
-            var simpleConsumer = new SimpleConsumer(ACCESS_URL, CONCURRENT_GROUP);
             simpleConsumer.Subscribe(STANDARD_TOPIC, new FilterExpression("*", ExpressionType.TAG));
             await simpleConsumer.Start();
 
             await ConsumeAndAckMessages(simpleConsumer);
 
-            await simpleConsumer.Shutdown();
+            // await simpleConsumer.Shutdown();
 
             Console.ReadKey();
         }
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs b/csharp/rocketmq-client-csharp/AccessPoint.cs
index ab29273..f05fa29 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -37,35 +37,27 @@ namespace Org.Apache.Rocketmq
                 throw new ArgumentException("Access url should be of format host:port");
             }
 
-            _host = segments[0];
-            _port = Int32.Parse(segments[1]);
+            Host = segments[0];
+            Port = Int32.Parse(segments[1]);
         }
-        
-        private string _host;
 
-        public string Host
-        {
-            get { return _host; }
-            set { _host = value; }
-        }
+        public string Host { get; }
 
-        private int _port;
+        public int Port { get; set; }
 
-        public int Port
+        public string TargetUrl()
         {
-            get { return _port; }
-            set { _port = value; }
+            return $"https://{Host}:{Port}";
         }
 
-        public string TargetUrl()
+        public rmq::AddressScheme HostScheme()
         {
-            return $"https://{_host}:{_port}";
+            return SchemeOf(Host);
         }
 
-        public rmq::AddressScheme HostScheme()
+        private static rmq::AddressScheme SchemeOf(string host)
         {
-            IPAddress ip;
-            bool result = IPAddress.TryParse(_host, out ip);
+            var result = IPAddress.TryParse(host, out var ip);
             if (!result)
             {
                 return rmq::AddressScheme.DomainName;
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 4b4884d..675b9eb 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -50,7 +50,7 @@ namespace Org.Apache.Rocketmq
             {
                 AccessPoint = new rmq::Endpoints
                 {
-                    Scheme = rmq::AddressScheme.Ipv4
+                    Scheme = AccessPoint.HostScheme()
                 }
             };
 
@@ -66,7 +66,7 @@ namespace Org.Apache.Rocketmq
                 Hostname = System.Net.Dns.GetHostName()
             };
 
-            _manager = new ClientManager();
+            Manager = new ClientManager();
 
             _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
             _updateTopicRouteCts = new CancellationTokenSource();
@@ -77,27 +77,28 @@ namespace Org.Apache.Rocketmq
         {
             Schedule(async () =>
             {
+                Logger.Debug("Update topic route by schedule");
                 await UpdateTopicRoute();
 
             }, 30, _updateTopicRouteCts.Token);
 
             // Get routes for topics of interest.
+            Logger.Debug("Step of #Start: get route for topics of interest");
             await UpdateTopicRoute();
 
             string accessPointUrl = AccessPoint.TargetUrl();
             CreateSession(accessPointUrl);
-
             await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
-
+            Logger.Debug($"Session has been created for {accessPointUrl}");
             await Heartbeat();
         }
 
         public virtual async Task Shutdown()
         {
-            Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
+            Logger.Info($"Shutdown client");
             _updateTopicRouteCts.Cancel();
             _telemetryCts.Cancel();
-            await _manager.Shutdown();
+            await Manager.Shutdown();
         }
 
         private string FilterBroker(Func<string, bool> acceptor)
@@ -106,7 +107,7 @@ namespace Org.Apache.Rocketmq
             {
                 foreach (var partition in item.Value.MessageQueues)
                 {
-                    string target = Utilities.TargetUrl(partition);
+                    var target = Utilities.TargetUrl(partition);
                     if (acceptor(target))
                     {
                         return target;
@@ -121,7 +122,7 @@ namespace Org.Apache.Rocketmq
          */
         private List<string> AvailableBrokerEndpoints()
         {
-            List<string> endpoints = new List<string>();
+            var endpoints = new List<string>();
             foreach (var item in _topicRouteTable)
             {
                 foreach (var partition in item.Value.MessageQueues)
@@ -154,7 +155,7 @@ namespace Org.Apache.Rocketmq
             List<string> topicList = new List<string>();
             topicList.AddRange(topics);
 
-            List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+            var tasks = new List<Task<TopicRouteData>>();
             foreach (var item in topicList)
             {
                 tasks.Add(GetRouteFor(item, true));
@@ -220,8 +221,10 @@ namespace Org.Apache.Rocketmq
          */
         protected async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
         {
+            Logger.Debug($"Get route for topic={topic}, direct={direct}");
             if (!direct && _topicRouteTable.ContainsKey(topic))
             {
+                Logger.Debug($"Return cached route for {topic}");
                 return _topicRouteTable[topic];
             }
 
@@ -244,7 +247,7 @@ namespace Org.Apache.Rocketmq
             }
 
             var metadata = new grpc.Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
             int index = _random.Next(0, AccessPointEndpoints.Count);
             var serviceEndpoint = AccessPointEndpoints[index];
             // AccessPointAddresses.Count
@@ -252,11 +255,12 @@ namespace Org.Apache.Rocketmq
             try
             {
                 Logger.Debug($"Resolving route for topic={topic}");
-                var topicRouteData = await _manager.ResolveRoute(target, metadata, request, RequestTimeout);
+                var topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
                 if (null != topicRouteData)
                 {
                     Logger.Debug($"Got route entries for {topic} from name server");
                     _topicRouteTable.TryAdd(topic, topicRouteData);
+                    Logger.Debug($"Got route for {topic} from {target}");
                     return topicRouteData;
                 }
                 Logger.Warn($"Failed to query route of {topic} from {target}");
@@ -288,12 +292,12 @@ namespace Org.Apache.Rocketmq
             PrepareHeartbeatData(request);
 
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
 
             List<Task> tasks = new List<Task>();
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(_manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
+                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
             }
 
             await Task.WhenAll(tasks);
@@ -339,8 +343,8 @@ namespace Org.Apache.Rocketmq
             try
             {
                 var metadata = new grpc::Metadata();
-                Signature.sign(this, metadata);
-                return await _manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
+                Signature.Sign(this, metadata);
+                return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
             }
             catch (System.Exception e)
             {
@@ -364,25 +368,22 @@ namespace Org.Apache.Rocketmq
             settings.MergeFrom(ClientSettings);
         }
 
-        private void CreateSession(string url)
+        private async Task CreateSession(string url)
         {
+            Logger.Debug($"Create session for url={url}");
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
-            var stream = _manager.Telemetry(url, metadata);
-            var session = new Session(stream, this);
+            Signature.Sign(this, metadata);
+            var stream = Manager.Telemetry(url, metadata);
+            var session = new Session(url, stream, this);
             _sessions.TryAdd(url, session);
-            Task.Run(async () =>
-            {
-                await session.Loop();
-            });
+            await session.Loop();
         }
 
-
         internal async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
         {
             var targetUrl = TargetUrl(assignment);
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
             var request = new rmq::ReceiveMessageRequest
             {
                 Group = new rmq::Resource
@@ -392,7 +393,8 @@ namespace Org.Apache.Rocketmq
                 },
                 MessageQueue = assignment.MessageQueue
             };
-            var messages = await _manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
+            var messages = await Manager.ReceiveMessage(targetUrl, metadata, request, 
+                ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
             return messages;
         }
 
@@ -420,8 +422,8 @@ namespace Org.Apache.Rocketmq
             request.Entries.Add(entry);
 
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
-            return await _manager.Ack(target, metadata, request, RequestTimeout);
+            Signature.Sign(this, metadata);
+            return await Manager.Ack(target, metadata, request, RequestTimeout);
         }
 
         public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
@@ -443,8 +445,8 @@ namespace Org.Apache.Rocketmq
             };
 
             var metadata = new grpc::Metadata();
-            Signature.sign(this, metadata);
-            return await _manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
+            Signature.Sign(this, metadata);
+            return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
         }
 
         public async Task<bool> NotifyClientTermination()
@@ -454,13 +456,13 @@ namespace Org.Apache.Rocketmq
 
 
             var metadata = new grpc.Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
 
             List<Task<Boolean>> tasks = new List<Task<Boolean>>();
 
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(_manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
+                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
             }
 
             bool[] results = await Task.WhenAll(tasks);
@@ -487,9 +489,24 @@ namespace Org.Apache.Rocketmq
                 ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
                 ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
             }
+
+            switch (settings.PubSubCase)
+            {
+                case rmq.Settings.PubSubOneofCase.Publishing:
+                {
+                    ClientSettings.Publishing = settings.Publishing;
+                    break;
+                }
+
+                case rmq.Settings.PubSubOneofCase.Subscription:
+                {
+                    ClientSettings.Subscription = settings.Subscription;
+                    break;
+                }
+            }
         }
 
-        protected readonly IClientManager _manager;
+        protected readonly IClientManager Manager;
 
         private readonly HashSet<string> _topicsOfInterest = new HashSet<string>();
 
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs
index 0d99cb1..86175a2 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -75,15 +75,6 @@ namespace Org.Apache.Rocketmq
             set { credentialsProvider_ = value; }
         }
 
-        public string tenantId()
-        {
-            return _tenantId;
-        }
-        public string TenantId
-        {
-            set { _tenantId = value; }
-        }
-
         public TimeSpan RequestTimeout
         {
             get
@@ -96,15 +87,6 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public TimeSpan getLongPollingTimeout()
-        {
-            return longPollingIoTimeout_;
-        }
-        public TimeSpan LongPollingTimeout
-        {
-            set { longPollingIoTimeout_ = value; }
-        }
-
         public string getGroupName()
         {
             return groupName_;
@@ -139,9 +121,7 @@ namespace Org.Apache.Rocketmq
         protected string _resourceNamespace;
 
         private ICredentialsProvider credentialsProvider_;
-
-        private string _tenantId;
-
+        
         private TimeSpan _requestTimeout;
 
         private TimeSpan longPollingIoTimeout_;
@@ -150,7 +130,7 @@ namespace Org.Apache.Rocketmq
 
         private string clientId_;
 
-        private bool tracingEnabled_ = false;
+        private bool tracingEnabled_;
 
         private string instanceName_ = "default";
 
diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs b/csharp/rocketmq-client-csharp/IClientConfig.cs
index 438d7a8..57325b4 100644
--- a/csharp/rocketmq-client-csharp/IClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/IClientConfig.cs
@@ -28,10 +28,6 @@ namespace Org.Apache.Rocketmq
 
         ICredentialsProvider credentialsProvider();
 
-        string tenantId();
-
-        TimeSpan getLongPollingTimeout();
-
         string getGroupName();
 
         string clientId();
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index e337b1a..7d55904 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -161,7 +161,7 @@ namespace Org.Apache.Rocketmq
             }
 
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
 
             Exception ex = null;
 
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
                 {
                     var stopWatch = new Stopwatch();
                     stopWatch.Start();
-                    rmq::SendMessageResponse response = await _manager.SendMessage(target, metadata, request, RequestTimeout);
+                    rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
                     if (null != response && rmq::Code.Ok == response.Status.Code)
                     {
                         var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs
index c1f1cd6..dc9d753 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -76,7 +76,7 @@ namespace Org.Apache.Rocketmq
 
         public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
         {
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+            var deadline = DateTime.UtcNow.Add(TimeSpan.FromDays(3650));
             var callOptions = new CallOptions(metadata, deadline);
             return _stub.Telemetry(callOptions);
         }
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index d2011bf..4d09894 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -28,9 +28,11 @@ namespace Org.Apache.Rocketmq
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        public Session(grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+        public Session(string target, 
+            grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
             Client client)
         {
+            _target = target;
             _stream = stream;
             _client = client;
             _channel = Channel.CreateUnbounded<bool>();
@@ -46,13 +48,13 @@ namespace Org.Apache.Rocketmq
             };
             _client.BuildClientSetting(request.Settings);
             await writer.WriteAsync(request);
-            Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
+            Logger.Debug($"Writing Client Settings to {_target} Done: {request.Settings}");
             while (!_client.TelemetryCts().IsCancellationRequested)
             {
                 if (await reader.MoveNext(_client.TelemetryCts().Token))
                 {
                     var cmd = reader.Current;
-                    Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
+                    Logger.Debug($"Received a TelemetryCommand from {_target}: {cmd}");
                     switch (cmd.CommandCase)
                     {
                         case rmq::TelemetryCommand.CommandOneofCase.None:
@@ -71,7 +73,7 @@ namespace Org.Apache.Rocketmq
                                     await _channel.Writer.WriteAsync(true);
                                 }
 
-                                Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+                                Logger.Info($"Received settings from {_target}: {cmd.Settings}");
                                 _client.OnSettingsReceived(cmd.Settings);
                                 break;
                             }
@@ -90,7 +92,7 @@ namespace Org.Apache.Rocketmq
                     }
                 }
             }
-            Logger.Info("Telemetry stream cancelled");
+            Logger.Info($"Telemetry stream for {_target} is cancelled");
             await writer.CompleteAsync();
         }
 
@@ -111,5 +113,6 @@ namespace Org.Apache.Rocketmq
         private long _established;
 
         private readonly Channel<bool> _channel;
+        private readonly string _target;
     };
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index 2331b53..ec78171 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -16,22 +16,18 @@
  */
 using System;
 using System.Text;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using System.Security.Cryptography;
 
 namespace Org.Apache.Rocketmq
 {
-    public class Signature
+    public static class Signature
     {
-        public static void sign(IClientConfig clientConfig, grpc::Metadata metadata)
+        public static void Sign(IClientConfig clientConfig, grpc::Metadata metadata)
         {
             metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
             metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
             metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId());
-            if (!String.IsNullOrEmpty(clientConfig.tenantId()))
-            {
-                metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
-            }
 
             if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
             {
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 121e9d3..b9d83ce 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -56,6 +56,7 @@ namespace Org.Apache.Rocketmq
             {
                 settings.Subscription.Subscriptions.Add(kv.Value);
             }
+            Logger.Info($"ClientSettings built OK. {settings}");
         }
 
         public override async Task Start()
@@ -65,29 +66,31 @@ namespace Org.Apache.Rocketmq
             // Scan load assignment periodically
             Schedule(async () =>
             {
-                while (!_scanAssignmentCts.IsCancellationRequested)
-                {
-                    await ScanLoadAssignments();                    
-                }
+                Logger.Debug("Scan load assignments by schedule");
+                await ScanLoadAssignments();
             }, 30, _scanAssignmentCts.Token);
 
             await ScanLoadAssignments();
+            Logger.Debug("Step of #Start: ScanLoadAssignments completed");
         }
 
         public override async Task Shutdown()
         {
+            _scanAssignmentCts.Cancel();
             await base.Shutdown();
             if (!await NotifyClientTermination())
             {
                 Logger.Warn("Failed to NotifyClientTermination");
             }
         }
-
+        
+        /**
+         * For 5.x, we can assume there is a load-balancer before gateway nodes.
+         */
         private async Task ScanLoadAssignments()
         {
-
-            List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
-            List<string> topics = new List<string>();
+            var tasks = new List<Task<List<rmq.Assignment>>>();
+            var topics = new List<string>();
             foreach (var sub in _subscriptions)
             {
                 var request = new rmq::QueryAssignmentRequest
@@ -107,7 +110,7 @@ namespace Org.Apache.Rocketmq
 
                 request.Endpoints = new rmq::Endpoints
                 {
-                    Scheme = rmq.AddressScheme.Ipv4
+                    Scheme = AccessPoint.HostScheme()
                 };
                 var address = new rmq::Address
                 {
@@ -117,12 +120,11 @@ namespace Org.Apache.Rocketmq
                 request.Endpoints.Addresses.Add(address);
 
                 var metadata = new Metadata();
-                Signature.sign(this, metadata);
-                tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
+                Signature.Sign(this, metadata);
+                tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
             }
 
-            List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
-
+            var list = await Task.WhenAll(tasks);
             var i = 0;
             foreach (var assignments in list)
             {
@@ -211,9 +213,9 @@ namespace Org.Apache.Rocketmq
             
             var targetUrl = Utilities.TargetUrl(messageQueue);
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
+            Signature.Sign(this, metadata);
             
-            return await _manager.ReceiveMessage(targetUrl, metadata, request, 
+            return await Manager.ReceiveMessage(targetUrl, metadata, request, 
                 ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
         }
 
@@ -241,8 +243,8 @@ namespace Org.Apache.Rocketmq
 
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
-            await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
+            Signature.Sign(this, metadata);
+            await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
         }
 
         public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
@@ -266,8 +268,8 @@ namespace Org.Apache.Rocketmq
 
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
-            Signature.sign(this, metadata);
-            await _manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
+            Signature.Sign(this, metadata);
+            await Manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
         }
         
         private rmq.MessageQueue NextQueue()
diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs
index af5983c..e12c027 100644
--- a/csharp/tests/ClientManagerTest.cs
+++ b/csharp/tests/ClientManagerTest.cs
@@ -48,7 +48,7 @@ namespace Org.Apache.Rocketmq
             clientConfig.CredentialsProvider = credentialsProvider;
             clientConfig.ResourceNamespace = resourceNamespace;
             clientConfig.Region = "cn-hangzhou-pre";
-            Signature.sign(clientConfig, metadata);
+            Signature.Sign(clientConfig, metadata);
             var clientManager = new ClientManager();
             string target = "https://116.62.231.199:80";
             var topicRouteData = clientManager.ResolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index b5094a2..bcf36a5 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -26,30 +26,14 @@ namespace tests
     [TestClass]
     public class ProducerTest
     {
-
-        private static AccessPoint _accessPoint;
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
-        {
-            _accessPoint = new AccessPoint
-            {
-                Host = HOST,
-                Port = PORT
-            };
-        }
-
-        [ClassCleanup]
-        public static void TearDown()
-        {
-        }
-
         [TestMethod]
         public async Task TestLifecycle()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             await producer.Shutdown();
         }
@@ -57,21 +41,26 @@ namespace tests
         [TestMethod]
         public async Task TestSendStandardMessage()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            
-            // Tag the massage. A message has at most one tag.
-            msg.Tag = "Tag-0";
-            
+            var msg = new Message(TOPIC, body)
+            {
+                // Tag the massage. A message has at most one tag.
+                Tag = "Tag-0"
+            };
+
             // Associate the message with one or multiple keys
-            var keys = new List<string>();
-            keys.Add("k1");
-            keys.Add("k2");
+            var keys = new List<string>
+            {
+                "k1",
+                "k2"
+            };
             msg.Keys = keys;
             
             var sendResult = await producer.Send(msg);
@@ -82,23 +71,28 @@ namespace tests
         [TestMethod]
         public async Task TestSendMultipleMessages()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             for (var i = 0; i < 128; i++)
             {
-                var msg = new Message(topic, body);
-            
-                // Tag the massage. A message has at most one tag.
-                msg.Tag = "Tag-0";
-            
+                var msg = new Message(TOPIC, body)
+                {
+                    // Tag the massage. A message has at most one tag.
+                    Tag = "Tag-0"
+                };
+
                 // Associate the message with one or multiple keys
-                var keys = new List<string>();
-                keys.Add("k1");
-                keys.Add("k2");
+                var keys = new List<string>
+                {
+                    "k1",
+                    "k2"
+                };
                 msg.Keys = keys;
                 var sendResult = await producer.Send(msg);
                 Assert.IsNotNull(sendResult);                
@@ -109,17 +103,20 @@ namespace tests
         [TestMethod]
         public async Task TestSendFifoMessage()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            
-            // Messages of the same group will get delivered one after another. 
-            msg.MessageGroup = "message-group-0";
-            
+            var msg = new Message(TOPIC, body)
+            {
+                // Messages of the same group will get delivered one after another. 
+                MessageGroup = "message-group-0"
+            };
+
             // Verify messages are FIFO iff their message group is not null or empty.
             Assert.IsTrue(msg.Fifo());
 
@@ -131,15 +128,19 @@ namespace tests
         [TestMethod]
         public async Task TestSendScheduledMessage()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            
-            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            var msg = new Message(TOPIC, body)
+            {
+                DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+            };
+
             Assert.IsTrue(msg.Scheduled());
             
             var sendResult = await producer.Send(msg);
@@ -154,15 +155,19 @@ namespace tests
         [TestMethod]
         public async Task TestSendMessage_Failure()
         {
-            var producer = new Producer($"{HOST}:{PORT}");
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
+            var producer = new Producer($"{HOST}:{PORT}")
+            {
+                CredentialsProvider = new ConfigFileCredentialsProvider(),
+                Region = "cn-hangzhou-pre"
+            };
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            msg.MessageGroup = "Group-0";
-            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            var msg = new Message(TOPIC, body)
+            {
+                MessageGroup = "Group-0",
+                DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+            };
             Assert.IsTrue(msg.Scheduled());
 
             try
@@ -176,10 +181,9 @@ namespace tests
             await producer.Shutdown();
         }
         
-        private static string topic = "cpp_sdk_standard";
-
-        private static string HOST = "127.0.0.1";
-        private static int PORT = 8081;
+        private const string TOPIC = "cpp_sdk_standard";
+        private const string HOST = "127.0.0.1";
+        private const int PORT = 8081;
     }
 
 }
\ No newline at end of file
diff --git a/csharp/tests/RpcClientTest.cs b/csharp/tests/RpcClientTest.cs
index a1ecf82..b438047 100644
--- a/csharp/tests/RpcClientTest.cs
+++ b/csharp/tests/RpcClientTest.cs
@@ -39,7 +39,7 @@ namespace Org.Apache.Rocketmq
             var rpc_client = new RpcClient(target);
             var client_config = new ClientConfig();
             var metadata = new grpc::Metadata();
-            Signature.sign(client_config, metadata);
+            Signature.Sign(client_config, metadata);
 
             var cmd = new rmq::TelemetryCommand();
             cmd.Settings = new rmq::Settings();
@@ -107,7 +107,7 @@ namespace Org.Apache.Rocketmq
             var rpc_client = new RpcClient(target);
             var client_config = new ClientConfig();
             var metadata = new grpc::Metadata();
-            Signature.sign(client_config, metadata);
+            Signature.Sign(client_config, metadata);
             var request = new rmq::QueryRouteRequest();
             request.Topic = new rmq::Resource();
             request.Topic.Name = "cpp_sdk_standard";
@@ -128,7 +128,7 @@ namespace Org.Apache.Rocketmq
             var rpc_client = new RpcClient(target);
             var client_config = new ClientConfig();
             var metadata = new grpc::Metadata();
-            Signature.sign(client_config, metadata);
+            Signature.Sign(client_config, metadata);
 
             var request = new rmq::SendMessageRequest();
             var message = new rmq::Message();
diff --git a/csharp/tests/SignatureTest.cs b/csharp/tests/SignatureTest.cs
index 16d0f46..12d1c10 100644
--- a/csharp/tests/SignatureTest.cs
+++ b/csharp/tests/SignatureTest.cs
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using Moq;
-using System;
+using Org.Apache.Rocketmq;
 
-namespace Org.Apache.Rocketmq
+namespace tests
 {
 
     [TestClass]
@@ -27,11 +27,10 @@ namespace Org.Apache.Rocketmq
     {
 
         [TestMethod]
-        public void testSign()
+        public void TestSign()
         {
             var mock = new Mock<IClientConfig>();
             mock.Setup(x => x.getGroupName()).Returns("G1");
-            mock.Setup(x => x.tenantId()).Returns("Tenant-id");
             mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
             mock.Setup(x => x.serviceName()).Returns("mq");
             mock.Setup(x => x.region()).Returns("cn-hangzhou");
@@ -42,7 +41,7 @@ namespace Org.Apache.Rocketmq
             mock.Setup(x => x.credentialsProvider()).Returns(credentialsProvider);
 
             var metadata = new grpc::Metadata();
-            Signature.sign(mock.Object, metadata);
+            Signature.Sign(mock.Object, metadata);
             Assert.IsNotNull(metadata.Get(MetadataConstants.AUTHORIZATION));
         }
     }