You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/23 02:57:20 UTC
[rocketmq-client-csharp] branch observability updated: WIP: refactor start procedure
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/observability by this push:
new c7299c6 WIP: refactor start procedure
c7299c6 is described below
commit c7299c641fecc4f231433be86324ec1147870df4
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 23 10:57:12 2022 +0800
WIP: refactor start procedure
---
rocketmq-client-csharp/Client.cs | 64 ++++++++++++++++++++++++++++++--
rocketmq-client-csharp/ClientManager.cs | 3 ++
rocketmq-client-csharp/IClient.cs | 3 ++
rocketmq-client-csharp/IConsumer.cs | 2 +-
rocketmq-client-csharp/IProducer.cs | 2 +-
rocketmq-client-csharp/Producer.cs | 4 +-
rocketmq-client-csharp/PushConsumer.cs | 4 +-
rocketmq-client-csharp/Session.cs | 40 ++++++++++++++------
rocketmq-client-csharp/SimpleConsumer.cs | 9 ++---
tests/ProducerTest.cs | 2 +-
tests/SimpleConsumerTest.cs | 7 ++--
11 files changed, 110 insertions(+), 30 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 28ef42b..b2f9369 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -19,6 +19,7 @@ using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
+using System.Diagnostics;
using System;
using rmq = Apache.Rocketmq.V2;
using grpc = global::Grpc.Core;
@@ -64,21 +65,34 @@ namespace Org.Apache.Rocketmq
_updateTopicRouteCts = new CancellationTokenSource();
_healthCheckCts = new CancellationTokenSource();
+
+ telemetryCts_ = new CancellationTokenSource();
}
- public virtual void Start()
+ public virtual async Task Start()
{
schedule(async () =>
{
await UpdateTopicRoute();
}, 30, _updateTopicRouteCts.Token);
+
+ // Get routes for topics of interest.
+ await UpdateTopicRoute();
+
+ string accessPointUrl = _accessPoint.TargetUrl();
+ createSession(accessPointUrl);
+
+ await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
+
+ await Heartbeat();
}
public virtual async Task Shutdown()
{
Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
_updateTopicRouteCts.Cancel();
+ telemetryCts_.Cancel();
await Manager.Shutdown();
}
@@ -120,32 +134,58 @@ namespace Org.Apache.Rocketmq
private async Task UpdateTopicRoute()
{
- List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+ HashSet<string> topics = new HashSet<string>();
+ foreach (var topic in topicsOfInterest_)
+ {
+ topics.Add(topic);
+ }
+
foreach (var item in _topicRouteTable)
{
- tasks.Add(GetRouteFor(item.Key, true));
+ topics.Add(item.Key);
+ }
+ Logger.Debug($"Fetch topic route for {topics.Count} topics");
+
+ // Wrap topics into list such that we can map async result to topic
+ List<string> topicList = new List<string>();
+ topicList.AddRange(topics);
+
+ List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+ foreach (var item in topicList)
+ {
+ tasks.Add(GetRouteFor(item, true));
}
// Update topic route data
TopicRouteData[] result = await Task.WhenAll(tasks);
+ var i = 0;
foreach (var item in result)
{
if (null == item)
{
+ Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
+ ++i;
continue;
}
if (0 == item.MessageQueues.Count)
{
+ Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
+ ++i;
continue;
}
var topicName = item.MessageQueues[0].Topic.Name;
+
+ // Make assertion
+ Debug.Assert(topicName.Equals(topicList[i]));
+
var existing = _topicRouteTable[topicName];
if (!existing.Equals(item))
{
_topicRouteTable[topicName] = item;
}
+ ++i;
}
}
@@ -202,6 +242,7 @@ namespace Org.Apache.Rocketmq
TopicRouteData topicRouteData;
try
{
+ Logger.Debug($"Resolving route for topic={topic}");
topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
if (null != topicRouteData)
{
@@ -310,6 +351,7 @@ namespace Org.Apache.Rocketmq
Signature.sign(this, metadata);
var stream = Manager.Telemetry(url, metadata);
var session = new Session(url, stream, this);
+ _sessions.TryAdd(url, session);
Task.Run(async () =>
{
await session.Loop();
@@ -415,16 +457,32 @@ namespace Org.Apache.Rocketmq
protected readonly IClientManager Manager;
+ private readonly HashSet<string> topicsOfInterest_ = new HashSet<string>();
+
+ public void AddTopicOfInterest(string topic)
+ {
+ topicsOfInterest_.Add(topic);
+ }
+
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
private readonly CancellationTokenSource _updateTopicRouteCts;
private readonly CancellationTokenSource _healthCheckCts;
+ private readonly CancellationTokenSource telemetryCts_ = new CancellationTokenSource();
+
+ public CancellationTokenSource TelemetryCts()
+ {
+ return telemetryCts_;
+ }
+
protected readonly AccessPoint _accessPoint;
// This field is subject changes from servers.
protected rmq::Settings _clientSettings;
private Random random = new Random();
+
+ private ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 54ceff2..c48d4d3 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -84,6 +84,7 @@ namespace Org.Apache.Rocketmq
rmq::QueryRouteRequest request, TimeSpan timeout)
{
var rpcClient = GetRpcClient(target);
+ Logger.Debug($"QueryRouteRequest: {request.ToString()}");
var queryRouteResponse = await rpcClient.QueryRoute(metadata, request, timeout);
if (queryRouteResponse.Status.Code != rmq::Code.Ok)
@@ -91,6 +92,7 @@ namespace Org.Apache.Rocketmq
Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status.ToString()}");
// Raise an application layer exception
}
+ Logger.Debug($"QueryRouteResponse: {queryRouteResponse.ToString()}");
var messageQueues = new List<rmq::MessageQueue>();
foreach (var messageQueue in queryRouteResponse.MessageQueues)
@@ -105,6 +107,7 @@ namespace Org.Apache.Rocketmq
TimeSpan timeout)
{
var rpcClient = GetRpcClient(target);
+ Logger.Debug($"Heartbeat to {target}, Request: {request.ToString()}");
var response = await rpcClient.Heartbeat(metadata, request, timeout);
Logger.Debug($"Heartbeat to {target} response status: {response.Status.ToString()}");
return response.Status.Code == rmq::Code.Ok;
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 461c452..3352028 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -16,6 +16,7 @@
*/
using System.Threading.Tasks;
+using System.Threading;
using System;
using rmq = Apache.Rocketmq.V2;
@@ -32,5 +33,7 @@ namespace Org.Apache.Rocketmq
void OnSettingsReceived(rmq::Settings settings);
+
+ CancellationTokenSource TelemetryCts();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IConsumer.cs b/rocketmq-client-csharp/IConsumer.cs
index de27f1f..2ad0dab 100644
--- a/rocketmq-client-csharp/IConsumer.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -20,7 +20,7 @@ namespace Org.Apache.Rocketmq
{
public interface IConsumer
{
- void Start();
+ Task Start();
Task Shutdown();
}
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 088df5e..420af20 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -21,7 +21,7 @@ namespace Org.Apache.Rocketmq
{
public interface IProducer
{
- void Start();
+ Task Start();
Task Shutdown();
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 4a39f33..452f699 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -33,9 +33,9 @@ namespace Org.Apache.Rocketmq
this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
}
- public override void Start()
+ public override async Task Start()
{
- base.Start();
+ await base.Start();
// More initialization
}
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index 3b37950..cc30943 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -34,14 +34,14 @@ namespace Org.Apache.Rocketmq
_scanExpiredProcessQueueCTS = new CancellationTokenSource();
}
- public override async void Start()
+ public override async Task Start()
{
if (null == _messageListener)
{
throw new System.Exception("Bad configuration: message listener is required");
}
- base.Start();
+ await base.Start();
// Step-1: Resolve topic routes
List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 51eb09c..44fa5fc 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -16,6 +16,7 @@
*/
using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
using grpc = global::Grpc.Core;
using NLog;
@@ -34,6 +35,7 @@ namespace Org.Apache.Rocketmq
this._target = target;
this._stream = stream;
this._client = client;
+ this._channel = Channel.CreateUnbounded<bool>();
}
public async Task Loop()
@@ -45,9 +47,9 @@ namespace Org.Apache.Rocketmq
_client.BuildClientSetting(request.Settings);
await writer.WriteAsync(request);
Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
- while (!_cts.IsCancellationRequested)
+ while (!_client.TelemetryCts().IsCancellationRequested)
{
- if (await reader.MoveNext(_cts.Token))
+ if (await reader.MoveNext(_client.TelemetryCts().Token))
{
var cmd = reader.Current;
Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
@@ -56,10 +58,19 @@ namespace Org.Apache.Rocketmq
case rmq::TelemetryCommand.CommandOneofCase.None:
{
Logger.Warn($"Telemetry failed: {cmd.Status}");
+ if (0 == Interlocked.CompareExchange(ref _established, 0, 2))
+ {
+ await _channel.Writer.WriteAsync(false);
+ }
break;
}
case rmq::TelemetryCommand.CommandOneofCase.Settings:
{
+ if (0 == Interlocked.CompareExchange(ref _established, 0, 1))
+ {
+ await _channel.Writer.WriteAsync(true);
+ }
+
Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
_client.OnSettingsReceived(cmd.Settings);
break;
@@ -79,11 +90,8 @@ namespace Org.Apache.Rocketmq
}
}
}
- }
-
- public void Cancel()
- {
- _cts.Cancel();
+ Logger.Info("Telemetry stream cancelled");
+ await writer.CompleteAsync();
}
private string _target;
@@ -93,14 +101,22 @@ namespace Org.Apache.Rocketmq
get { return _target; }
}
+ public async Task AwaitSettingNegotiationCompletion()
+ {
+ if (0 != Interlocked.Read(ref _established))
+ {
+ return;
+ }
+
+ Logger.Debug("Await setting negotiation");
+ await _channel.Reader.ReadAsync();
+ }
+
private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
private IClient _client;
- private CancellationTokenSource _cts = new CancellationTokenSource();
+ private long _established = 0;
- public CancellationTokenSource CTS
- {
- get { return _cts; }
- }
+ private Channel<bool> _channel;
};
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index afd447a..8be51d7 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -18,7 +18,6 @@
using rmq = Apache.Rocketmq.V2;
using NLog;
using System.Threading.Tasks;
-using System.Collections.Generic;
using System.Collections.Concurrent;
using Apache.Rocketmq.V2;
@@ -52,10 +51,9 @@ namespace Org.Apache.Rocketmq
}
}
- public override void Start()
+ public override async Task Start()
{
- base.Start();
- base.createSession(_accessPoint.TargetUrl());
+ await base.Start();
}
public override async Task Shutdown()
@@ -85,6 +83,7 @@ namespace Org.Apache.Rocketmq
entry.Expression.Type = filterType;
entry.Expression.Expression = expression;
subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
+ AddTopicOfInterest(topic);
}
public override void OnSettingsReceived(Settings settings)
@@ -94,8 +93,8 @@ namespace Org.Apache.Rocketmq
if (settings.Subscription.Fifo)
{
fifo_ = true;
+ Logger.Info($"#OnSettingsReceived: Group {Group} is FIFO");
}
-
}
private string group_;
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index baeca17..faf0f45 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -46,7 +46,7 @@ namespace Org.Apache.Rocketmq
var producer = new Producer(accessPoint, resourceNamespace);
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
- producer.Start();
+ await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
var msg = new Message(topic, body);
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
index 29f155f..70bcdd3 100644
--- a/tests/SimpleConsumerTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -30,7 +30,8 @@ namespace Org.Apache.Rocketmq
public async Task TestStart()
{
var accessPoint = new AccessPoint();
- var host = "11.166.42.94";
+ // var host = "11.166.42.94";
+ var host = "127.0.0.1";
var port = 8081;
accessPoint.Host = host;
accessPoint.Port = port;
@@ -40,8 +41,8 @@ namespace Org.Apache.Rocketmq
var simpleConsumer = new SimpleConsumer(accessPoint, resourceNamespace, group);
simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
- simpleConsumer.Start();
- Thread.Sleep(10_000);
+ await simpleConsumer.Start();
+ Thread.Sleep(1_000);
await simpleConsumer.Shutdown();
}