You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/02/21 07:17:16 UTC

[rocketmq-client-csharp] branch develop updated: Ensure rpc stub dispose correctly when shutdown (#10)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/develop by this push:
     new fdaceb8  Ensure rpc stub dispose correctly when shutdown (#10)
fdaceb8 is described below

commit fdaceb8a9c781c7be7c7787a200c908edd2290cd
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Mon Feb 21 15:17:02 2022 +0800

    Ensure rpc stub dispose correctly when shutdown (#10)
    
    * Transparent retry when query topic route entries
    
    * Make sure stub shutdown gracefully
---
 rocketmq-client-csharp/Client.cs         | 56 ++++++++++++++++++++------------
 rocketmq-client-csharp/ClientManager.cs  | 19 +++++++++++
 rocketmq-client-csharp/IClientManager.cs |  1 +
 rocketmq-client-csharp/IProducer.cs      |  6 ++--
 rocketmq-client-csharp/IRpcClient.cs     |  2 ++
 rocketmq-client-csharp/Producer.cs       | 10 +++---
 rocketmq-client-csharp/RpcClient.cs      | 13 ++++++--
 tests/ProducerTest.cs                    |  6 ++--
 8 files changed, 80 insertions(+), 33 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index dac6d89..78e2f44 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -40,7 +40,7 @@ namespace org.apache.rocketmq
             this.updateTopicRouteCTS = new CancellationTokenSource();
         }
 
-        public virtual void start()
+        public virtual void Start()
         {
             schedule(async () =>
             {
@@ -55,10 +55,11 @@ namespace org.apache.rocketmq
 
         }
 
-        public virtual void shutdown()
+        public virtual async Task Shutdown()
         {
             updateTopicRouteCTS.Cancel();
             nameServerResolverCTS.Cancel();
+            await clientManager.Shutdown();
         }
 
         private async Task updateNameServerList()
@@ -175,24 +176,37 @@ namespace org.apache.rocketmq
                 }
             }
 
-            // We got one or more name servers available.
-            string nameServer = nameServers[currentNameServerIndex];
-            var request = new rmq::QueryRouteRequest();
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = resourceNamespace_;
-            request.Topic.Name = topic;
-            request.Endpoints = new rmq::Endpoints();
-            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
-            var address = new rmq::Address();
-            int pos = nameServer.LastIndexOf(':');
-            address.Host = nameServer.Substring(0, pos);
-            address.Port = Int32.Parse(nameServer.Substring(pos + 1));
-            request.Endpoints.Addresses.Add(address);
-            var target = string.Format("https://{0}:{1}", address.Host, address.Port);
-            var metadata = new grpc.Metadata();
-            Signature.sign(this, metadata);
-            var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
-            return topicRouteData;
+
+            for (int retry = 0; retry < MaxTransparentRetry; retry++)
+            {
+                // We got one or more name servers available.
+                int index = (currentNameServerIndex + retry) % nameServers.Count;
+                string nameServer = nameServers[index];
+                var request = new rmq::QueryRouteRequest();
+                request.Topic = new rmq::Resource();
+                request.Topic.ResourceNamespace = resourceNamespace_;
+                request.Topic.Name = topic;
+                request.Endpoints = new rmq::Endpoints();
+                request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+                var address = new rmq::Address();
+                int pos = nameServer.LastIndexOf(':');
+                address.Host = nameServer.Substring(0, pos);
+                address.Port = Int32.Parse(nameServer.Substring(pos + 1));
+                request.Endpoints.Addresses.Add(address);
+                var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+                var metadata = new grpc.Metadata();
+                Signature.sign(this, metadata);
+                var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
+                if (null != topicRouteData)
+                {
+                    if (retry > 0)
+                    {
+                        currentNameServerIndex = index;
+                    }
+                    return topicRouteData;
+                }
+            }
+            return null;
         }
 
         public abstract void prepareHeartbeatData(rmq::HeartbeatRequest request);
@@ -258,5 +272,7 @@ namespace org.apache.rocketmq
 
         private ConcurrentDictionary<string, TopicRouteData> topicRouteTable;
         private CancellationTokenSource updateTopicRouteCTS;
+
+        private const int MaxTransparentRetry = 3;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 87697d0..e79b844 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -162,6 +162,25 @@ namespace org.apache.rocketmq {
             return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
         }
 
+        public async Task Shutdown()
+        {
+            _clientLock.EnterReadLock();
+            try
+            {
+                List<Task> tasks = new List<Task>();
+                foreach (var item in _rpcClients)
+                {
+                    tasks.Add(item.Value.Shutdown());
+                }
+                await Task.WhenAll(tasks);
+            }
+            finally
+            {
+                _clientLock.ExitReadLock();
+            }
+
+        }
+
         private readonly Dictionary<string, RpcClient> _rpcClients;
         private readonly ReaderWriterLockSlim _clientLock;
     }
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index 08ed86a..361638a 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -32,5 +32,6 @@ namespace org.apache.rocketmq {
 
         Task<SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
 
+        Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 89f8955..34a3fbd 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -19,11 +19,11 @@ using System.Threading.Tasks;
 
 namespace org.apache.rocketmq {
     public interface IProducer {
-        void start();
+        void Start();
 
-        void shutdown();
+        Task Shutdown();
 
-        Task<SendResult> send(Message message);
+        Task<SendResult> Send(Message message);
         
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index f46afae..86aeab2 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -32,5 +32,7 @@ namespace org.apache.rocketmq
             NotifyClientTerminationRequest request, TimeSpan timeout);
 
         Task<SendMessageResponse> SendMessage(Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+
+        Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 0b3f8a0..ab5f48d 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -32,16 +32,16 @@ namespace org.apache.rocketmq
             this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
         }
 
-        public override void start()
+        public override void Start()
         {
-            base.start();
+            base.Start();
             // More initalization
         }
 
-        public override void shutdown()
+        public override async Task Shutdown()
         {
             // Release local resources
-            base.shutdown();
+            await base.Shutdown();
         }
 
         public override void prepareHeartbeatData(rmq::HeartbeatRequest request)
@@ -49,7 +49,7 @@ namespace org.apache.rocketmq
 
         }
 
-        public async Task<SendResult> send(Message message)
+        public async Task<SendResult> Send(Message message)
         {
             if (!loadBalancer.ContainsKey(message.Topic))
             {
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 8279953..9796e2c 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -30,17 +30,26 @@ namespace org.apache.rocketmq
     public class RpcClient : IRpcClient
     {
         private readonly MessagingService.MessagingServiceClient _stub;
+        private GrpcChannel _channel;
 
         public RpcClient(string target)
         {
-            var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+            _channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
             {
                 HttpHandler = CreateHttpHandler()
             });
-            var invoker = channel.Intercept(new ClientLoggerInterceptor());
+            var invoker = _channel.Intercept(new ClientLoggerInterceptor());
             _stub = new MessagingService.MessagingServiceClient(invoker);
         }
 
+        public async Task Shutdown()
+        {
+            if (null != _channel)
+            {
+                await _channel.ShutdownAsync();
+            }
+        }
+
         /**
          * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
          * why parameters are configured this way.
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index 961b167..c05be97 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -49,13 +49,13 @@ namespace org.apache.rocketmq
             producer.ResourceNamespace = resourceNamespace;
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
-            producer.start();
+            producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             var msg = new Message(topic, body);
-            var sendResult = producer.send(msg).GetAwaiter().GetResult();
+            var sendResult = producer.Send(msg).GetAwaiter().GetResult();
             Assert.IsNotNull(sendResult);
-            producer.shutdown();
+            producer.Shutdown().GetAwaiter().GetResult();
         }
 
         private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";