You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/02/21 07:17:16 UTC
[rocketmq-client-csharp] branch develop updated: Ensure rpc stub dispose correctly when shutdown (#10)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new fdaceb8 Ensure rpc stub dispose correctly when shutdown (#10)
fdaceb8 is described below
commit fdaceb8a9c781c7be7c7787a200c908edd2290cd
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Mon Feb 21 15:17:02 2022 +0800
Ensure rpc stub dispose correctly when shutdown (#10)
* Transparent retry when query topic route entries
* Make sure stub shutdown gracefully
---
rocketmq-client-csharp/Client.cs | 56 ++++++++++++++++++++------------
rocketmq-client-csharp/ClientManager.cs | 19 +++++++++++
rocketmq-client-csharp/IClientManager.cs | 1 +
rocketmq-client-csharp/IProducer.cs | 6 ++--
rocketmq-client-csharp/IRpcClient.cs | 2 ++
rocketmq-client-csharp/Producer.cs | 10 +++---
rocketmq-client-csharp/RpcClient.cs | 13 ++++++--
tests/ProducerTest.cs | 6 ++--
8 files changed, 80 insertions(+), 33 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index dac6d89..78e2f44 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -40,7 +40,7 @@ namespace org.apache.rocketmq
this.updateTopicRouteCTS = new CancellationTokenSource();
}
- public virtual void start()
+ public virtual void Start()
{
schedule(async () =>
{
@@ -55,10 +55,11 @@ namespace org.apache.rocketmq
}
- public virtual void shutdown()
+ public virtual async Task Shutdown()
{
updateTopicRouteCTS.Cancel();
nameServerResolverCTS.Cancel();
+ await clientManager.Shutdown();
}
private async Task updateNameServerList()
@@ -175,24 +176,37 @@ namespace org.apache.rocketmq
}
}
- // We got one or more name servers available.
- string nameServer = nameServers[currentNameServerIndex];
- var request = new rmq::QueryRouteRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = resourceNamespace_;
- request.Topic.Name = topic;
- request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
- var address = new rmq::Address();
- int pos = nameServer.LastIndexOf(':');
- address.Host = nameServer.Substring(0, pos);
- address.Port = Int32.Parse(nameServer.Substring(pos + 1));
- request.Endpoints.Addresses.Add(address);
- var target = string.Format("https://{0}:{1}", address.Host, address.Port);
- var metadata = new grpc.Metadata();
- Signature.sign(this, metadata);
- var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
- return topicRouteData;
+
+ for (int retry = 0; retry < MaxTransparentRetry; retry++)
+ {
+ // We got one or more name servers available.
+ int index = (currentNameServerIndex + retry) % nameServers.Count;
+ string nameServer = nameServers[index];
+ var request = new rmq::QueryRouteRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = resourceNamespace_;
+ request.Topic.Name = topic;
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ int pos = nameServer.LastIndexOf(':');
+ address.Host = nameServer.Substring(0, pos);
+ address.Port = Int32.Parse(nameServer.Substring(pos + 1));
+ request.Endpoints.Addresses.Add(address);
+ var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+ var metadata = new grpc.Metadata();
+ Signature.sign(this, metadata);
+ var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
+ if (null != topicRouteData)
+ {
+ if (retry > 0)
+ {
+ currentNameServerIndex = index;
+ }
+ return topicRouteData;
+ }
+ }
+ return null;
}
public abstract void prepareHeartbeatData(rmq::HeartbeatRequest request);
@@ -258,5 +272,7 @@ namespace org.apache.rocketmq
private ConcurrentDictionary<string, TopicRouteData> topicRouteTable;
private CancellationTokenSource updateTopicRouteCTS;
+
+ private const int MaxTransparentRetry = 3;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 87697d0..e79b844 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -162,6 +162,25 @@ namespace org.apache.rocketmq {
return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
}
+ public async Task Shutdown()
+ {
+ _clientLock.EnterReadLock();
+ try
+ {
+ List<Task> tasks = new List<Task>();
+ foreach (var item in _rpcClients)
+ {
+ tasks.Add(item.Value.Shutdown());
+ }
+ await Task.WhenAll(tasks);
+ }
+ finally
+ {
+ _clientLock.ExitReadLock();
+ }
+
+ }
+
private readonly Dictionary<string, RpcClient> _rpcClients;
private readonly ReaderWriterLockSlim _clientLock;
}
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index 08ed86a..361638a 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -32,5 +32,6 @@ namespace org.apache.rocketmq {
Task<SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+ Task Shutdown();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 89f8955..34a3fbd 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -19,11 +19,11 @@ using System.Threading.Tasks;
namespace org.apache.rocketmq {
public interface IProducer {
- void start();
+ void Start();
- void shutdown();
+ Task Shutdown();
- Task<SendResult> send(Message message);
+ Task<SendResult> Send(Message message);
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index f46afae..86aeab2 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -32,5 +32,7 @@ namespace org.apache.rocketmq
NotifyClientTerminationRequest request, TimeSpan timeout);
Task<SendMessageResponse> SendMessage(Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+
+ Task Shutdown();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 0b3f8a0..ab5f48d 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -32,16 +32,16 @@ namespace org.apache.rocketmq
this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
}
- public override void start()
+ public override void Start()
{
- base.start();
+ base.Start();
// More initalization
}
- public override void shutdown()
+ public override async Task Shutdown()
{
// Release local resources
- base.shutdown();
+ await base.Shutdown();
}
public override void prepareHeartbeatData(rmq::HeartbeatRequest request)
@@ -49,7 +49,7 @@ namespace org.apache.rocketmq
}
- public async Task<SendResult> send(Message message)
+ public async Task<SendResult> Send(Message message)
{
if (!loadBalancer.ContainsKey(message.Topic))
{
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 8279953..9796e2c 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -30,17 +30,26 @@ namespace org.apache.rocketmq
public class RpcClient : IRpcClient
{
private readonly MessagingService.MessagingServiceClient _stub;
+ private GrpcChannel _channel;
public RpcClient(string target)
{
- var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+ _channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
{
HttpHandler = CreateHttpHandler()
});
- var invoker = channel.Intercept(new ClientLoggerInterceptor());
+ var invoker = _channel.Intercept(new ClientLoggerInterceptor());
_stub = new MessagingService.MessagingServiceClient(invoker);
}
+ public async Task Shutdown()
+ {
+ if (null != _channel)
+ {
+ await _channel.ShutdownAsync();
+ }
+ }
+
/**
* See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
* why parameters are configured this way.
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index 961b167..c05be97 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -49,13 +49,13 @@ namespace org.apache.rocketmq
producer.ResourceNamespace = resourceNamespace;
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
- producer.start();
+ producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
var msg = new Message(topic, body);
- var sendResult = producer.send(msg).GetAwaiter().GetResult();
+ var sendResult = producer.Send(msg).GetAwaiter().GetResult();
Assert.IsNotNull(sendResult);
- producer.shutdown();
+ producer.Shutdown().GetAwaiter().GetResult();
}
private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";