You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/23 05:52:47 UTC
[rocketmq-client-csharp] branch observability updated: WIP
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/observability by this push:
new d1c60b3 WIP
d1c60b3 is described below
commit d1c60b34d01fee4839c988c111ff6a4ff110a6af
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 23 13:52:36 2022 +0800
WIP
---
rocketmq-client-csharp/Client.cs | 8 +--
rocketmq-client-csharp/Session.cs | 2 +-
rocketmq-client-csharp/SimpleConsumer.cs | 101 ++++++++++++++++++++++++-------
3 files changed, 83 insertions(+), 28 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index b2f9369..14d9b5b 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -479,10 +479,10 @@ namespace Org.Apache.Rocketmq
protected readonly AccessPoint _accessPoint;
// This field is subject changes from servers.
- protected rmq::Settings _clientSettings;
+ protected readonly rmq::Settings _clientSettings;
- private Random random = new Random();
-
- private ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
+ private readonly Random random = new Random();
+
+ protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 44fa5fc..a6be057 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -24,7 +24,7 @@ using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
- class Session
+ public class Session
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 8be51d7..31d6e80 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -15,11 +15,14 @@
* limitations under the License.
*/
+using System;
using rmq = Apache.Rocketmq.V2;
using NLog;
using System.Threading.Tasks;
using System.Collections.Concurrent;
-using Apache.Rocketmq.V2;
+using System.Threading;
+using Grpc.Core;
+using System.Collections.Generic;
namespace Org.Apache.Rocketmq
{
@@ -30,9 +33,10 @@ namespace Org.Apache.Rocketmq
string resourceNamespace, string group)
: base(accessPoint, resourceNamespace)
{
- fifo_ = false;
- subscriptions_ = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
- group_ = group;
+ _fifo = false;
+ _subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
+ _topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
+ _group = group;
}
public override void BuildClientSetting(rmq::Settings settings)
@@ -42,18 +46,29 @@ namespace Org.Apache.Rocketmq
settings.ClientType = rmq::ClientType.SimpleConsumer;
settings.Subscription = new rmq::Subscription();
settings.Subscription.Group = new rmq::Resource();
- settings.Subscription.Group.Name = Group;
+ settings.Subscription.Group.Name = _group;
settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
- foreach (var entry in subscriptions_)
+ foreach (var kv in _subscriptions)
{
- settings.Subscription.Subscriptions.Add(entry.Value);
+ settings.Subscription.Subscriptions.Add(kv.Value);
}
}
public override async Task Start()
{
await base.Start();
+
+ // Scan load assignment periodically
+ schedule(async () =>
+ {
+ while (!_scanAssignmentCts.IsCancellationRequested)
+ {
+ await ScanLoadAssignments();
+ }
+ }, 30, _scanAssignmentCts.Token);
+
+ await ScanLoadAssignments();
}
public override async Task Shutdown()
@@ -65,11 +80,57 @@ namespace Org.Apache.Rocketmq
}
}
+ private async Task ScanLoadAssignments()
+ {
+
+ List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
+ List<string> topics = new List<string>();
+ foreach (var sub in _subscriptions)
+ {
+ var request = new rmq::QueryAssignmentRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = ResourceNamespace;
+ request.Topic.Name = sub.Key;
+ topics.Add(sub.Key);
+ request.Group = new rmq::Resource();
+ request.Group.Name = _group;
+ request.Group.ResourceNamespace = ResourceNamespace;
+
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Host = _accessPoint.Host;
+ address.Port = _accessPoint.Port;
+ request.Endpoints.Addresses.Add(address);
+
+ var metadata = new Metadata();
+ Signature.sign(this, metadata);
+ tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
+ }
+
+ List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
+
+ var i = 0;
+ foreach (var assignments in list)
+ {
+ string topic = topics[i];
+ if (null == assignments || 0 == assignments.Count)
+ {
+ Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
+ ++i;
+ continue;
+ }
+ Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
+ _topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
+ ++i;
+ }
+ }
+
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
{
request.ClientType = rmq::ClientType.SimpleConsumer;
request.Group = new rmq::Resource();
- request.Group.Name = Group;
+ request.Group.Name = _group;
request.Group.ResourceNamespace = ResourceNamespace;
}
@@ -82,31 +143,25 @@ namespace Org.Apache.Rocketmq
entry.Expression = new rmq::FilterExpression();
entry.Expression.Type = filterType;
entry.Expression.Expression = expression;
- subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
+ _subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
AddTopicOfInterest(topic);
}
- public override void OnSettingsReceived(Settings settings)
+ public override void OnSettingsReceived(rmq.Settings settings)
{
base.OnSettingsReceived(settings);
if (settings.Subscription.Fifo)
{
- fifo_ = true;
- Logger.Info($"#OnSettingsReceived: Group {Group} is FIFO");
+ _fifo = true;
+ Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
}
}
- private string group_;
-
- public string Group
- {
- get { return group_; }
- }
-
- private bool fifo_;
-
- private ConcurrentDictionary<string, rmq::SubscriptionEntry> subscriptions_;
-
+ private readonly string _group;
+ private bool _fifo;
+ private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
+ private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
+ private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();
}
}
\ No newline at end of file