You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/23 02:57:20 UTC

[rocketmq-client-csharp] branch observability updated: WIP: refactor start procedure

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/observability by this push:
     new c7299c6  WIP: refactor start procedure
c7299c6 is described below

commit c7299c641fecc4f231433be86324ec1147870df4
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 23 10:57:12 2022 +0800

    WIP: refactor start procedure
---
 rocketmq-client-csharp/Client.cs         | 64 ++++++++++++++++++++++++++++++--
 rocketmq-client-csharp/ClientManager.cs  |  3 ++
 rocketmq-client-csharp/IClient.cs        |  3 ++
 rocketmq-client-csharp/IConsumer.cs      |  2 +-
 rocketmq-client-csharp/IProducer.cs      |  2 +-
 rocketmq-client-csharp/Producer.cs       |  4 +-
 rocketmq-client-csharp/PushConsumer.cs   |  4 +-
 rocketmq-client-csharp/Session.cs        | 40 ++++++++++++++------
 rocketmq-client-csharp/SimpleConsumer.cs |  9 ++---
 tests/ProducerTest.cs                    |  2 +-
 tests/SimpleConsumerTest.cs              |  7 ++--
 11 files changed, 110 insertions(+), 30 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 28ef42b..b2f9369 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -19,6 +19,7 @@ using System.Collections.Generic;
 using System.Collections.Concurrent;
 using System.Threading.Tasks;
 using System.Threading;
+using System.Diagnostics;
 using System;
 using rmq = Apache.Rocketmq.V2;
 using grpc = global::Grpc.Core;
@@ -64,21 +65,34 @@ namespace Org.Apache.Rocketmq
             _updateTopicRouteCts = new CancellationTokenSource();
 
             _healthCheckCts = new CancellationTokenSource();
+
+            telemetryCts_ = new CancellationTokenSource();
         }
 
-        public virtual void Start()
+        public virtual async Task Start()
         {
             schedule(async () =>
             {
                 await UpdateTopicRoute();
 
             }, 30, _updateTopicRouteCts.Token);
+
+            // Get routes for topics of interest.
+            await UpdateTopicRoute();
+
+            string accessPointUrl = _accessPoint.TargetUrl();
+            createSession(accessPointUrl);
+
+            await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
+
+            await Heartbeat();
         }
 
         public virtual async Task Shutdown()
         {
             Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
             _updateTopicRouteCts.Cancel();
+            telemetryCts_.Cancel();
             await Manager.Shutdown();
         }
 
@@ -120,32 +134,58 @@ namespace Org.Apache.Rocketmq
 
         private async Task UpdateTopicRoute()
         {
-            List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+            HashSet<string> topics = new HashSet<string>();
+            foreach (var topic in topicsOfInterest_)
+            {
+                topics.Add(topic);
+            }
+
             foreach (var item in _topicRouteTable)
             {
-                tasks.Add(GetRouteFor(item.Key, true));
+                topics.Add(item.Key);
+            }
+            Logger.Debug($"Fetch topic route for {topics.Count} topics");
+
+            // Wrap topics into list such that we can map async result to topic 
+            List<string> topicList = new List<string>();
+            topicList.AddRange(topics);
+
+            List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+            foreach (var item in topicList)
+            {
+                tasks.Add(GetRouteFor(item, true));
             }
 
             // Update topic route data
             TopicRouteData[] result = await Task.WhenAll(tasks);
+            var i = 0;
             foreach (var item in result)
             {
                 if (null == item)
                 {
+                    Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
+                    ++i;
                     continue;
                 }
 
                 if (0 == item.MessageQueues.Count)
                 {
+                    Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
+                    ++i;
                     continue;
                 }
 
                 var topicName = item.MessageQueues[0].Topic.Name;
+
+                // Make assertion
+                Debug.Assert(topicName.Equals(topicList[i]));
+
                 var existing = _topicRouteTable[topicName];
                 if (!existing.Equals(item))
                 {
                     _topicRouteTable[topicName] = item;
                 }
+                ++i;
             }
         }
 
@@ -202,6 +242,7 @@ namespace Org.Apache.Rocketmq
             TopicRouteData topicRouteData;
             try
             {
+                Logger.Debug($"Resolving route for topic={topic}");
                 topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
                 if (null != topicRouteData)
                 {
@@ -310,6 +351,7 @@ namespace Org.Apache.Rocketmq
             Signature.sign(this, metadata);
             var stream = Manager.Telemetry(url, metadata);
             var session = new Session(url, stream, this);
+            _sessions.TryAdd(url, session);
             Task.Run(async () =>
             {
                 await session.Loop();
@@ -415,16 +457,32 @@ namespace Org.Apache.Rocketmq
 
         protected readonly IClientManager Manager;
 
+        private readonly HashSet<string> topicsOfInterest_ = new HashSet<string>();
+
+        public void AddTopicOfInterest(string topic)
+        {
+            topicsOfInterest_.Add(topic);
+        }
+
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
         private readonly CancellationTokenSource _updateTopicRouteCts;
 
         private readonly CancellationTokenSource _healthCheckCts;
 
+        private readonly CancellationTokenSource telemetryCts_ = new CancellationTokenSource();
+
+        public CancellationTokenSource TelemetryCts()
+        {
+            return telemetryCts_;
+        }
+
         protected readonly AccessPoint _accessPoint;
 
         // This field is subject changes from servers.
         protected rmq::Settings _clientSettings;
 
         private Random random = new Random();
+
+        private ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 54ceff2..c48d4d3 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -84,6 +84,7 @@ namespace Org.Apache.Rocketmq
             rmq::QueryRouteRequest request, TimeSpan timeout)
         {
             var rpcClient = GetRpcClient(target);
+            Logger.Debug($"QueryRouteRequest: {request.ToString()}");
             var queryRouteResponse = await rpcClient.QueryRoute(metadata, request, timeout);
 
             if (queryRouteResponse.Status.Code != rmq::Code.Ok)
@@ -91,6 +92,7 @@ namespace Org.Apache.Rocketmq
                 Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status.ToString()}");
                 // Raise an application layer exception
             }
+            Logger.Debug($"QueryRouteResponse: {queryRouteResponse.ToString()}");
 
             var messageQueues = new List<rmq::MessageQueue>();
             foreach (var messageQueue in queryRouteResponse.MessageQueues)
@@ -105,6 +107,7 @@ namespace Org.Apache.Rocketmq
             TimeSpan timeout)
         {
             var rpcClient = GetRpcClient(target);
+            Logger.Debug($"Heartbeat to {target}, Request: {request.ToString()}");
             var response = await rpcClient.Heartbeat(metadata, request, timeout);
             Logger.Debug($"Heartbeat to {target} response status: {response.Status.ToString()}");
             return response.Status.Code == rmq::Code.Ok;
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 461c452..3352028 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -16,6 +16,7 @@
  */
 
 using System.Threading.Tasks;
+using System.Threading;
 using System;
 using rmq = Apache.Rocketmq.V2;
 
@@ -32,5 +33,7 @@ namespace Org.Apache.Rocketmq
 
 
         void OnSettingsReceived(rmq::Settings settings);
+
+        CancellationTokenSource TelemetryCts();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IConsumer.cs b/rocketmq-client-csharp/IConsumer.cs
index de27f1f..2ad0dab 100644
--- a/rocketmq-client-csharp/IConsumer.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -20,7 +20,7 @@ namespace Org.Apache.Rocketmq
 {
     public interface IConsumer
     {
-        void Start();
+        Task Start();
 
         Task Shutdown();
     }
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 088df5e..420af20 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -21,7 +21,7 @@ namespace Org.Apache.Rocketmq
 {
     public interface IProducer
     {
-        void Start();
+        Task Start();
 
         Task Shutdown();
 
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 4a39f33..452f699 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -33,9 +33,9 @@ namespace Org.Apache.Rocketmq
             this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
         }
 
-        public override void Start()
+        public override async Task Start()
         {
-            base.Start();
+            await base.Start();
             // More initialization
         }
 
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index 3b37950..cc30943 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -34,14 +34,14 @@ namespace Org.Apache.Rocketmq
             _scanExpiredProcessQueueCTS = new CancellationTokenSource();
         }
 
-        public override async void Start()
+        public override async Task Start()
         {
             if (null == _messageListener)
             {
                 throw new System.Exception("Bad configuration: message listener is required");
             }
 
-            base.Start();
+            await base.Start();
 
             // Step-1: Resolve topic routes
             List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 51eb09c..44fa5fc 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -16,6 +16,7 @@
  */
 
 using System.Threading;
+using System.Threading.Channels;
 using System.Threading.Tasks;
 using grpc = global::Grpc.Core;
 using NLog;
@@ -34,6 +35,7 @@ namespace Org.Apache.Rocketmq
             this._target = target;
             this._stream = stream;
             this._client = client;
+            this._channel = Channel.CreateUnbounded<bool>();
         }
 
         public async Task Loop()
@@ -45,9 +47,9 @@ namespace Org.Apache.Rocketmq
             _client.BuildClientSetting(request.Settings);
             await writer.WriteAsync(request);
             Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
-            while (!_cts.IsCancellationRequested)
+            while (!_client.TelemetryCts().IsCancellationRequested)
             {
-                if (await reader.MoveNext(_cts.Token))
+                if (await reader.MoveNext(_client.TelemetryCts().Token))
                 {
                     var cmd = reader.Current;
                     Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
@@ -56,10 +58,19 @@ namespace Org.Apache.Rocketmq
                         case rmq::TelemetryCommand.CommandOneofCase.None:
                             {
                                 Logger.Warn($"Telemetry failed: {cmd.Status}");
+                                if (0 == Interlocked.CompareExchange(ref _established, 0, 2))
+                                {
+                                    await _channel.Writer.WriteAsync(false);
+                                }
                                 break;
                             }
                         case rmq::TelemetryCommand.CommandOneofCase.Settings:
                             {
+                                if (0 == Interlocked.CompareExchange(ref _established, 0, 1))
+                                {
+                                    await _channel.Writer.WriteAsync(true);
+                                }
+
                                 Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
                                 _client.OnSettingsReceived(cmd.Settings);
                                 break;
@@ -79,11 +90,8 @@ namespace Org.Apache.Rocketmq
                     }
                 }
             }
-        }
-
-        public void Cancel()
-        {
-            _cts.Cancel();
+            Logger.Info("Telemetry stream cancelled");
+            await writer.CompleteAsync();
         }
 
         private string _target;
@@ -93,14 +101,22 @@ namespace Org.Apache.Rocketmq
             get { return _target; }
         }
 
+        public async Task AwaitSettingNegotiationCompletion()
+        {
+            if (0 != Interlocked.Read(ref _established))
+            {
+                return;
+            }
+
+            Logger.Debug("Await setting negotiation");
+            await _channel.Reader.ReadAsync();
+        }
+
         private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
         private IClient _client;
 
-        private CancellationTokenSource _cts = new CancellationTokenSource();
+        private long _established = 0;
 
-        public CancellationTokenSource CTS
-        {
-            get { return _cts; }
-        }
+        private Channel<bool> _channel;
     };
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index afd447a..8be51d7 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -18,7 +18,6 @@
 using rmq = Apache.Rocketmq.V2;
 using NLog;
 using System.Threading.Tasks;
-using System.Collections.Generic;
 using System.Collections.Concurrent;
 using Apache.Rocketmq.V2;
 
@@ -52,10 +51,9 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public override void Start()
+        public override async Task Start()
         {
-            base.Start();
-            base.createSession(_accessPoint.TargetUrl());
+            await base.Start();
         }
 
         public override async Task Shutdown()
@@ -85,6 +83,7 @@ namespace Org.Apache.Rocketmq
             entry.Expression.Type = filterType;
             entry.Expression.Expression = expression;
             subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
+            AddTopicOfInterest(topic);
         }
 
         public override void OnSettingsReceived(Settings settings)
@@ -94,8 +93,8 @@ namespace Org.Apache.Rocketmq
             if (settings.Subscription.Fifo)
             {
                 fifo_ = true;
+                Logger.Info($"#OnSettingsReceived: Group {Group} is FIFO");
             }
-
         }
 
         private string group_;
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index baeca17..faf0f45 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -46,7 +46,7 @@ namespace Org.Apache.Rocketmq
             var producer = new Producer(accessPoint, resourceNamespace);
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
-            producer.Start();
+            await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             var msg = new Message(topic, body);
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
index 29f155f..70bcdd3 100644
--- a/tests/SimpleConsumerTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -30,7 +30,8 @@ namespace Org.Apache.Rocketmq
         public async Task TestStart()
         {
             var accessPoint = new AccessPoint();
-            var host = "11.166.42.94";
+            // var host = "11.166.42.94";
+            var host = "127.0.0.1";
             var port = 8081;
             accessPoint.Host = host;
             accessPoint.Port = port;
@@ -40,8 +41,8 @@ namespace Org.Apache.Rocketmq
 
             var simpleConsumer = new SimpleConsumer(accessPoint, resourceNamespace, group);
             simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
-            simpleConsumer.Start();
-            Thread.Sleep(10_000);
+            await simpleConsumer.Start();
+            Thread.Sleep(1_000);
             await simpleConsumer.Shutdown();
         }