You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/21 09:05:17 UTC
[rocketmq-client-csharp] 03/04: WIP
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
commit 86083310e94f41a27d470f6b8d63b27394e63168
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 16:44:26 2022 +0800
WIP
---
rocketmq-client-csharp/Client.cs | 15 +++++++++++++++
rocketmq-client-csharp/IClient.cs | 1 +
rocketmq-client-csharp/Producer.cs | 2 ++
rocketmq-client-csharp/Session.cs | 2 +-
rocketmq-client-csharp/SimpleConsumer.cs | 16 ++++++++++++++++
5 files changed, 35 insertions(+), 1 deletion(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 1eb368e..607daf4 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -398,6 +398,21 @@ namespace Org.Apache.Rocketmq
return true;
}
+ public virtual void OnReceive(rmq::Settings settings)
+ {
+ if (null != settings.Metric)
+ {
+ _clientSettings.Metric = new rmq::Metric();
+ _clientSettings.Metric.MergeFrom(settings.Metric);
+ }
+
+ if (null != settings.BackoffPolicy)
+ {
+ _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
+ _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+ }
+ }
+
protected readonly IClientManager Manager;
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index abdcc21..4b7206b 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -31,5 +31,6 @@ namespace Org.Apache.Rocketmq
void BuildClientSetting(rmq::Settings settings);
+ void OnReceive(rmq::Settings settings);
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index bfcc1d3..32606ae 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -47,7 +47,9 @@ namespace Org.Apache.Rocketmq
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
{
+ request.ClientType = rmq::ClientType.Producer;
+ // Concept of ProducerGroup has been removed.
}
public async Task<SendReceipt> Send(Message message)
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 3e234f2..f5e7795 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -60,8 +60,8 @@ namespace Org.Apache.Rocketmq
}
case rmq::TelemetryCommand.CommandOneofCase.Settings:
{
-
Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+ _client.OnReceive(cmd.Settings);
break;
}
case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 4c447c9..9eaf365 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -19,6 +19,7 @@ using rmq = Apache.Rocketmq.V2;
using NLog;
using System.Collections.Generic;
using System.Collections.Concurrent;
+using Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -63,6 +64,10 @@ namespace Org.Apache.Rocketmq
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
{
+ request.ClientType = rmq::ClientType.SimpleConsumer;
+ request.Group = new rmq::Resource();
+ request.Group.Name = Group;
+ request.Group.ResourceNamespace = ResourceNamespace;
}
public void Subscribe(string topic, rmq::FilterType filterType, string expression)
@@ -77,6 +82,17 @@ namespace Org.Apache.Rocketmq
subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
}
+ public override void OnReceive(Settings settings)
+ {
+ base.OnReceive(settings);
+
+ if (settings.Subscription.Fifo)
+ {
+ fifo_ = true;
+ }
+
+ }
+
private string group_;
public string Group