You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/21 09:05:17 UTC

[rocketmq-client-csharp] 03/04: WIP

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git

commit 86083310e94f41a27d470f6b8d63b27394e63168
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 16:44:26 2022 +0800

    WIP
---
 rocketmq-client-csharp/Client.cs         | 15 +++++++++++++++
 rocketmq-client-csharp/IClient.cs        |  1 +
 rocketmq-client-csharp/Producer.cs       |  2 ++
 rocketmq-client-csharp/Session.cs        |  2 +-
 rocketmq-client-csharp/SimpleConsumer.cs | 16 ++++++++++++++++
 5 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 1eb368e..607daf4 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -398,6 +398,21 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
+        public virtual void OnReceive(rmq::Settings settings)
+        {
+            if (null != settings.Metric)
+            {
+                _clientSettings.Metric = new rmq::Metric();
+                _clientSettings.Metric.MergeFrom(settings.Metric);
+            }
+
+            if (null != settings.BackoffPolicy)
+            {
+                _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
+                _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+            }
+        }
+
         protected readonly IClientManager Manager;
 
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index abdcc21..4b7206b 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -31,5 +31,6 @@ namespace Org.Apache.Rocketmq
         void BuildClientSetting(rmq::Settings settings);
 
 
+        void OnReceive(rmq::Settings settings);
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index bfcc1d3..32606ae 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -47,7 +47,9 @@ namespace Org.Apache.Rocketmq
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
+            request.ClientType = rmq::ClientType.Producer;
 
+            // Concept of ProducerGroup has been removed.
         }
 
         public async Task<SendReceipt> Send(Message message)
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 3e234f2..f5e7795 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -60,8 +60,8 @@ namespace Org.Apache.Rocketmq
                             }
                         case rmq::TelemetryCommand.CommandOneofCase.Settings:
                             {
-
                                 Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+                                _client.OnReceive(cmd.Settings);
                                 break;
                             }
                         case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 4c447c9..9eaf365 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -19,6 +19,7 @@ using rmq = Apache.Rocketmq.V2;
 using NLog;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
+using Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
@@ -63,6 +64,10 @@ namespace Org.Apache.Rocketmq
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
+            request.ClientType = rmq::ClientType.SimpleConsumer;
+            request.Group = new rmq::Resource();
+            request.Group.Name = Group;
+            request.Group.ResourceNamespace = ResourceNamespace;
         }
 
         public void Subscribe(string topic, rmq::FilterType filterType, string expression)
@@ -77,6 +82,17 @@ namespace Org.Apache.Rocketmq
             subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
         }
 
+        public override void OnReceive(Settings settings)
+        {
+            base.OnReceive(settings);
+
+            if (settings.Subscription.Fifo)
+            {
+                fifo_ = true;
+            }
+
+        }
+
         private string group_;
 
         public string Group