You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/23 05:52:47 UTC

[rocketmq-client-csharp] branch observability updated: WIP

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/observability by this push:
     new d1c60b3  WIP
d1c60b3 is described below

commit d1c60b34d01fee4839c988c111ff6a4ff110a6af
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 23 13:52:36 2022 +0800

    WIP
---
 rocketmq-client-csharp/Client.cs         |   8 +--
 rocketmq-client-csharp/Session.cs        |   2 +-
 rocketmq-client-csharp/SimpleConsumer.cs | 101 ++++++++++++++++++++++++-------
 3 files changed, 83 insertions(+), 28 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index b2f9369..14d9b5b 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -479,10 +479,10 @@ namespace Org.Apache.Rocketmq
         protected readonly AccessPoint _accessPoint;
 
         // This field is subject changes from servers.
-        protected rmq::Settings _clientSettings;
+        protected readonly rmq::Settings _clientSettings;
 
-        private Random random = new Random();
-
-        private ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
+        private readonly Random random = new Random();
+        
+        protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 44fa5fc..a6be057 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -24,7 +24,7 @@ using rmq = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
-    class Session
+    public class Session
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 8be51d7..31d6e80 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -15,11 +15,14 @@
  * limitations under the License.
  */
 
+using System;
 using rmq = Apache.Rocketmq.V2;
 using NLog;
 using System.Threading.Tasks;
 using System.Collections.Concurrent;
-using Apache.Rocketmq.V2;
+using System.Threading;
+using Grpc.Core;
+using System.Collections.Generic;
 
 namespace Org.Apache.Rocketmq
 {
@@ -30,9 +33,10 @@ namespace Org.Apache.Rocketmq
         string resourceNamespace, string group)
         : base(accessPoint, resourceNamespace)
         {
-            fifo_ = false;
-            subscriptions_ = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
-            group_ = group;
+            _fifo = false;
+            _subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
+            _topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
+            _group = group;
         }
 
         public override void BuildClientSetting(rmq::Settings settings)
@@ -42,18 +46,29 @@ namespace Org.Apache.Rocketmq
             settings.ClientType = rmq::ClientType.SimpleConsumer;
             settings.Subscription = new rmq::Subscription();
             settings.Subscription.Group = new rmq::Resource();
-            settings.Subscription.Group.Name = Group;
+            settings.Subscription.Group.Name = _group;
             settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
 
-            foreach (var entry in subscriptions_)
+            foreach (var kv in _subscriptions)
             {
-                settings.Subscription.Subscriptions.Add(entry.Value);
+                settings.Subscription.Subscriptions.Add(kv.Value);
             }
         }
 
         public override async Task Start()
         {
             await base.Start();
+            
+            // Scan load assignment periodically
+            schedule(async () =>
+            {
+                while (!_scanAssignmentCts.IsCancellationRequested)
+                {
+                    await ScanLoadAssignments();                    
+                }
+            }, 30, _scanAssignmentCts.Token);
+
+            await ScanLoadAssignments();
         }
 
         public override async Task Shutdown()
@@ -65,11 +80,57 @@ namespace Org.Apache.Rocketmq
             }
         }
 
+        private async Task ScanLoadAssignments()
+        {
+
+            List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
+            List<string> topics = new List<string>();
+            foreach (var sub in _subscriptions)
+            {
+                var request = new rmq::QueryAssignmentRequest();
+                request.Topic = new rmq::Resource();
+                request.Topic.ResourceNamespace = ResourceNamespace;
+                request.Topic.Name = sub.Key;
+                topics.Add(sub.Key);
+                request.Group = new rmq::Resource();
+                request.Group.Name = _group;
+                request.Group.ResourceNamespace = ResourceNamespace;
+
+                request.Endpoints = new rmq::Endpoints();
+                request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
+                var address = new rmq::Address();
+                address.Host = _accessPoint.Host;
+                address.Port = _accessPoint.Port;
+                request.Endpoints.Addresses.Add(address);
+
+                var metadata = new Metadata();
+                Signature.sign(this, metadata);
+                tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
+            }
+
+            List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
+
+            var i = 0;
+            foreach (var assignments in list)
+            {
+                string topic = topics[i];
+                if (null == assignments || 0 == assignments.Count)
+                {
+                    Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
+                    ++i;
+                    continue;
+                }
+                Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
+                _topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
+                ++i;
+            }
+        }
+
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
             request.ClientType = rmq::ClientType.SimpleConsumer;
             request.Group = new rmq::Resource();
-            request.Group.Name = Group;
+            request.Group.Name = _group;
             request.Group.ResourceNamespace = ResourceNamespace;
         }
 
@@ -82,31 +143,25 @@ namespace Org.Apache.Rocketmq
             entry.Expression = new rmq::FilterExpression();
             entry.Expression.Type = filterType;
             entry.Expression.Expression = expression;
-            subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
+            _subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
             AddTopicOfInterest(topic);
         }
 
-        public override void OnSettingsReceived(Settings settings)
+        public override void OnSettingsReceived(rmq.Settings settings)
         {
             base.OnSettingsReceived(settings);
 
             if (settings.Subscription.Fifo)
             {
-                fifo_ = true;
-                Logger.Info($"#OnSettingsReceived: Group {Group} is FIFO");
+                _fifo = true;
+                Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
             }
         }
 
-        private string group_;
-
-        public string Group
-        {
-            get { return group_; }
-        }
-
-        private bool fifo_;
-
-        private ConcurrentDictionary<string, rmq::SubscriptionEntry> subscriptions_;
-
+        private readonly string _group;
+        private bool _fifo;
+        private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
+        private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
+        private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();
     }
 }
\ No newline at end of file