You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/21 09:05:14 UTC

[rocketmq-client-csharp] branch observability created (now a6b1ee9)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


      at a6b1ee9  Make Shutdown async

This branch includes the following new commits:

     new 12a13e8  Add package OpenTelemetry and Opentelemetry.API
     new 4433efa  WIP: write unit tests
     new 8608331  WIP
     new a6b1ee9  Make Shutdown async

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-client-csharp] 01/04: Add package OpenTelemetry and Opentelemetry.API

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git

commit 12a13e87e1324e65dc618b01c811fe22c1fc6c92
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Fri Jun 17 10:09:43 2022 +0800

    Add package OpenTelemetry and Opentelemetry.API
---
 rocketmq-client-csharp/rocketmq-client-csharp.csproj | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 094e1fa..cc81ff7 100644
--- a/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -19,6 +19,8 @@
       <PrivateAssets>all</PrivateAssets>
     </PackageReference>
     <PackageReference Include="NLog" Version="4.7.13" />
+    <PackageReference Include="OpenTelemetry" Version="1.3.0" />
+    <PackageReference Include="OpenTelemetry.Api" Version="1.3.0" />
 
     <Protobuf Include="Protos\apache\rocketmq\v2\definition.proto" ProtoRoot="Protos" GrpcServices="Client" />
     <Protobuf Include="Protos\google\rpc\code.proto" ProtoRoot="Protos" GrpcServices="Client" />


[rocketmq-client-csharp] 02/04: WIP: write unit tests

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git

commit 4433efa4b1710dc0c3116d89f4f957c02c31bdc3
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 11:15:30 2022 +0800

    WIP: write unit tests
---
 examples/Program.cs                                | 15 ++--
 rocketmq-client-csharp/AccessPoint.cs              |  5 ++
 rocketmq-client-csharp/Client.cs                   | 28 ++++++-
 rocketmq-client-csharp/ClientConfig.cs             | 66 ++++++++++------
 rocketmq-client-csharp/ClientManager.cs            |  2 +-
 .../ConfigFileCredentialsProvider.cs               | 30 ++++---
 rocketmq-client-csharp/Credentials.cs              | 32 +++++---
 rocketmq-client-csharp/IClient.cs                  |  4 +-
 rocketmq-client-csharp/IClientConfig.cs            |  6 +-
 rocketmq-client-csharp/IClientManager.cs           |  6 +-
 rocketmq-client-csharp/ICredentialsProvider.cs     |  6 +-
 rocketmq-client-csharp/IProducer.cs                |  6 +-
 rocketmq-client-csharp/Message.cs                  | 32 +++++---
 rocketmq-client-csharp/MessageType.cs              |  3 +-
 rocketmq-client-csharp/MetadataConstants.cs        |  6 +-
 rocketmq-client-csharp/Producer.cs                 |  2 +-
 rocketmq-client-csharp/PushConsumer.cs             |  2 +-
 rocketmq-client-csharp/RpcClient.cs                |  3 +-
 rocketmq-client-csharp/SendStatus.cs               |  6 +-
 rocketmq-client-csharp/Session.cs                  | 39 +++++----
 rocketmq-client-csharp/Signature.cs                | 26 +++---
 rocketmq-client-csharp/SimpleConsumer.cs           | 92 ++++++++++++++++++++++
 .../StaticCredentialsProvider.cs                   | 12 ++-
 tests/ClientConfigTest.cs                          |  9 ++-
 tests/ClientManagerTest.cs                         | 11 ++-
 tests/ConfigFileCredentialsProviderTest.cs         |  9 ++-
 tests/DateTimeTest.cs                              | 17 ++--
 tests/MessageTest.cs                               | 21 +++--
 tests/PushConsumerTest.cs                          | 10 ++-
 tests/SendResultTest.cs                            | 14 ++--
 tests/SignatureTest.cs                             |  8 +-
 tests/{SendResultTest.cs => SimpleConsumerTest.cs} | 39 +++++----
 tests/StaticCredentialsProviderTest.cs             |  9 ++-
 tests/TopicTest.cs                                 | 23 +++---
 tests/UnitTest1.cs                                 | 46 +++++------
 35 files changed, 447 insertions(+), 198 deletions(-)

diff --git a/examples/Program.cs b/examples/Program.cs
index d96f41e..09a1674 100644
--- a/examples/Program.cs
+++ b/examples/Program.cs
@@ -5,20 +5,24 @@ using System.Threading;
 namespace examples
 {
 
-    class Foo {
+    class Foo
+    {
         public int bar = 1;
     }
     class Program
     {
 
-        static void RT(Action action, int seconds, CancellationToken token) {
-            if (null == action) {
+        static void RT(Action action, int seconds, CancellationToken token)
+        {
+            if (null == action)
+            {
                 return;
             }
 
             Task.Run(async () =>
             {
-                while(!token.IsCancellationRequested) {
+                while (!token.IsCancellationRequested)
+                {
                     action();
                     await Task.Delay(TimeSpan.FromSeconds(seconds), token);
                 }
@@ -44,7 +48,8 @@ namespace examples
             ThreadPool.QueueUserWorkItem((Object stateInfo) =>
             {
                 Console.WriteLine("From ThreadPool");
-                if (stateInfo is Foo) {
+                if (stateInfo is Foo)
+                {
                     Console.WriteLine("Foo: bar=" + (stateInfo as Foo).bar);
                 }
             }, new Foo());
diff --git a/rocketmq-client-csharp/AccessPoint.cs b/rocketmq-client-csharp/AccessPoint.cs
index f97d216..cf4e1f4 100644
--- a/rocketmq-client-csharp/AccessPoint.cs
+++ b/rocketmq-client-csharp/AccessPoint.cs
@@ -33,5 +33,10 @@ namespace Org.Apache.Rocketmq
             get { return _port; }
             set { _port = value; }
         }
+
+        public string TargetUrl()
+        {
+            return $"https://{_host}:{_port}";
+        }
     }
 }
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 1d32095..1eb368e 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -33,15 +33,31 @@ namespace Org.Apache.Rocketmq
 
         protected Client(AccessPoint accessPoint, string resourceNamespace)
         {
+            _accessPoint = accessPoint;
+
             // Support IPv4 for now
             AccessPointScheme = rmq::AddressScheme.Ipv4;
-
             var serviceEndpoint = new rmq::Address();
             serviceEndpoint.Host = accessPoint.Host;
             serviceEndpoint.Port = accessPoint.Port;
             AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
 
             _resourceNamespace = resourceNamespace;
+
+            _clientSettings = new rmq::Settings();
+
+            _clientSettings.AccessPoint = new rmq::Endpoints();
+            _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+
+            _clientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+
+            _clientSettings.UserAgent = new rmq.UA();
+            _clientSettings.UserAgent.Language = rmq::Language.DotNet;
+            _clientSettings.UserAgent.Version = "5.0.0";
+            _clientSettings.UserAgent.Platform = Environment.OSVersion.ToString();
+            _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+
             Manager = ClientManagerFactory.getClientManager(resourceNamespace);
 
             _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
@@ -283,9 +299,9 @@ namespace Org.Apache.Rocketmq
             return $"https://{address.Host}:{address.Port}";
         }
 
-        public void buildClientSetting(rmq::Settings settings)
+        public virtual void BuildClientSetting(rmq::Settings settings)
         {
-
+            settings.MergeFrom(_clientSettings);
         }
 
         public void createSession(string url)
@@ -388,6 +404,12 @@ namespace Org.Apache.Rocketmq
         private readonly CancellationTokenSource _updateTopicRouteCts;
 
         private readonly CancellationTokenSource _healthCheckCts;
+
+        protected readonly AccessPoint _accessPoint;
+
+        // This field is subject changes from servers.
+        protected rmq::Settings _clientSettings;
+
         private Random random = new Random();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 6dc3eba..0d99cb1 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -18,11 +18,14 @@ using System;
 using System.Collections.Generic;
 using rmq = Apache.Rocketmq.V2;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
-    public class ClientConfig : IClientConfig {
+    public class ClientConfig : IClientConfig
+    {
 
-        public ClientConfig() {
+        public ClientConfig()
+        {
             var hostName = System.Net.Dns.GetHostName();
             var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
             this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
@@ -34,39 +37,50 @@ namespace Org.Apache.Rocketmq {
             this._publishing = new Publishing();
         }
 
-        public string region() {
+        public string region()
+        {
             return _region;
         }
-        public string Region {
+        public string Region
+        {
             set { _region = value; }
         }
 
-        public string serviceName() {
+        public string serviceName()
+        {
             return _serviceName;
         }
-        public string ServiceName {
+        public string ServiceName
+        {
             set { _serviceName = value; }
         }
 
-        public string resourceNamespace() {
+        public string resourceNamespace()
+        {
             return _resourceNamespace;
         }
-        public string ResourceNamespace {
+        public string ResourceNamespace
+        {
+            get { return _resourceNamespace; }
             set { _resourceNamespace = value; }
         }
 
-        public ICredentialsProvider credentialsProvider() {
+        public ICredentialsProvider credentialsProvider()
+        {
             return credentialsProvider_;
         }
-        
-        public ICredentialsProvider CredentialsProvider {
+
+        public ICredentialsProvider CredentialsProvider
+        {
             set { credentialsProvider_ = value; }
         }
 
-        public string tenantId() {
+        public string tenantId()
+        {
             return _tenantId;
         }
-        public string TenantId {
+        public string TenantId
+        {
             set { _tenantId = value; }
         }
 
@@ -82,32 +96,40 @@ namespace Org.Apache.Rocketmq {
             }
         }
 
-        public TimeSpan getLongPollingTimeout() {
+        public TimeSpan getLongPollingTimeout()
+        {
             return longPollingIoTimeout_;
         }
-        public TimeSpan LongPollingTimeout {
+        public TimeSpan LongPollingTimeout
+        {
             set { longPollingIoTimeout_ = value; }
         }
 
-        public string getGroupName() {
+        public string getGroupName()
+        {
             return groupName_;
         }
-        public string GroupName {
+        public string GroupName
+        {
             set { groupName_ = value; }
         }
 
-        public string clientId() {
+        public string clientId()
+        {
             return clientId_;
         }
 
-        public bool isTracingEnabled() {
+        public bool isTracingEnabled()
+        {
             return tracingEnabled_;
         }
-        public bool TracingEnabled {
+        public bool TracingEnabled
+        {
             set { tracingEnabled_ = value; }
         }
 
-        public void setInstanceName(string instanceName) {
+        public void setInstanceName(string instanceName)
+        {
             this.instanceName_ = instanceName;
         }
 
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index a0b377b..54ceff2 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -152,7 +152,7 @@ namespace Org.Apache.Rocketmq
             // TODO: 
 
             List<Message> messages = new List<Message>();
-            
+
             return messages;
         }
 
diff --git a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
index 9a3baa3..39dfd7e 100644
--- a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
+++ b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -19,36 +19,46 @@ using System;
 using System.Text.Json;
 using System.Collections.Generic;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     /**
      * File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config
      * A sample config content is as follows:
      * {"AccessKey": "key", "AccessSecret": "secret"}
      */
-    public class ConfigFileCredentialsProvider : ICredentialsProvider {
+    public class ConfigFileCredentialsProvider : ICredentialsProvider
+    {
 
-        public ConfigFileCredentialsProvider() {
+        public ConfigFileCredentialsProvider()
+        {
             var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
             string configFileRelativePath = "/.rocketmq/config";
-            if (!File.Exists(home + configFileRelativePath)) {
+            if (!File.Exists(home + configFileRelativePath))
+            {
                 return;
             }
 
-            try {
-                using (var reader = new StreamReader(home + configFileRelativePath)) {
+            try
+            {
+                using (var reader = new StreamReader(home + configFileRelativePath))
+                {
                     string json = reader.ReadToEnd();
                     var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json);
                     accessKey = kv["AccessKey"];
                     accessSecret = kv["AccessSecret"];
                     valid = true;
-                } 
-            } catch (IOException e) {
+                }
+            }
+            catch (IOException)
+            {
             }
         }
 
-        public Credentials getCredentials() {
-            if (!valid) {
+        public Credentials getCredentials()
+        {
+            if (!valid)
+            {
                 return null;
             }
 
diff --git a/rocketmq-client-csharp/Credentials.cs b/rocketmq-client-csharp/Credentials.cs
index 2ccafc8..a73b000 100644
--- a/rocketmq-client-csharp/Credentials.cs
+++ b/rocketmq-client-csharp/Credentials.cs
@@ -17,27 +17,34 @@
 
 using System;
 
-namespace Org.Apache.Rocketmq {
-    public class Credentials {
+namespace Org.Apache.Rocketmq
+{
+    public class Credentials
+    {
 
-        public Credentials(string accessKey, string accessSecret) {
+        public Credentials(string accessKey, string accessSecret)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
         }
 
-        public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant) {
+        public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
             this.sessionToken = sessionToken;
             this.expirationInstant = expirationInstant;
         }
 
-        public bool empty() {
+        public bool empty()
+        {
             return String.IsNullOrEmpty(accessKey) || String.IsNullOrEmpty(accessSecret);
         }
 
-        public bool expired() {
-            if (DateTime.MinValue == expirationInstant) {
+        public bool expired()
+        {
+            if (DateTime.MinValue == expirationInstant)
+            {
                 return false;
             }
 
@@ -45,17 +52,20 @@ namespace Org.Apache.Rocketmq {
         }
 
         private string accessKey;
-        public string AccessKey {
+        public string AccessKey
+        {
             get { return accessKey; }
         }
-        
+
         private string accessSecret;
-        public string AccessSecret {
+        public string AccessSecret
+        {
             get { return accessSecret; }
         }
 
         private string sessionToken;
-        public string SessionToken {
+        public string SessionToken
+        {
             get { return sessionToken; }
         }
 
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index f4115a2..abdcc21 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -28,6 +28,8 @@ namespace Org.Apache.Rocketmq
 
         Task<bool> NotifyClientTermination();
 
-        void buildClientSetting(rmq::Settings settings);
+        void BuildClientSetting(rmq::Settings settings);
+
+
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientConfig.cs b/rocketmq-client-csharp/IClientConfig.cs
index 3726ac4..438d7a8 100644
--- a/rocketmq-client-csharp/IClientConfig.cs
+++ b/rocketmq-client-csharp/IClientConfig.cs
@@ -16,8 +16,10 @@
  */
 using System;
 
-namespace Org.Apache.Rocketmq {
-    public interface IClientConfig {
+namespace Org.Apache.Rocketmq
+{
+    public interface IClientConfig
+    {
         string region();
 
         string serviceName();
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index d5c3ea3..afccfde 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -22,8 +22,10 @@ using grpc = global::Grpc.Core;
 using rmq = Apache.Rocketmq.V2;
 
 
-namespace Org.Apache.Rocketmq {
-    public interface IClientManager {
+namespace Org.Apache.Rocketmq
+{
+    public interface IClientManager
+    {
         IRpcClient GetRpcClient(string target);
 
         grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata);
diff --git a/rocketmq-client-csharp/ICredentialsProvider.cs b/rocketmq-client-csharp/ICredentialsProvider.cs
index 80e908f..1fb892b 100644
--- a/rocketmq-client-csharp/ICredentialsProvider.cs
+++ b/rocketmq-client-csharp/ICredentialsProvider.cs
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace Org.Apache.Rocketmq {
-    public interface ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+    public interface ICredentialsProvider
+    {
         Credentials getCredentials();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index cbb82d4..9c30c6c 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -17,8 +17,10 @@
 
 using System.Threading.Tasks;
 
-namespace Org.Apache.Rocketmq {
-    public interface IProducer {
+namespace Org.Apache.Rocketmq
+{
+    public interface IProducer
+    {
         void Start();
 
         void Shutdown();
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 5cbf1aa..b8b0e98 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -19,16 +19,20 @@ using System.Collections.Generic;
 namespace Org.Apache.Rocketmq
 {
 
-    public class Message {
-        public Message() : this(null, null) {
+    public class Message
+    {
+        public Message() : this(null, null)
+        {
         }
 
-        public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {}
+        public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { }
 
-        public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) {
+        public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body)
+        {
         }
 
-        public Message(string topic, string tag, List<string> keys, byte[] body) {
+        public Message(string topic, string tag, List<string> keys, byte[] body)
+        {
             this.messageId = SequenceGenerator.Instance.Next();
             this.maxAttemptTimes = 3;
             this.topic = topic;
@@ -58,37 +62,43 @@ namespace Org.Apache.Rocketmq
 
         private string topic;
 
-        public string Topic {
+        public string Topic
+        {
             get { return topic; }
             set { this.topic = value; }
         }
 
         private byte[] body;
-        public byte[] Body {
+        public byte[] Body
+        {
             get { return body; }
             set { this.body = value; }
         }
 
         private string tag;
-        public string Tag {
+        public string Tag
+        {
             get { return tag; }
             set { this.tag = value; }
         }
 
         private List<string> keys;
-        public List<string> Keys{
+        public List<string> Keys
+        {
             get { return keys; }
             set { this.keys = value; }
         }
 
         private Dictionary<string, string> userProperties;
-        public Dictionary<string, string> UserProperties {
+        public Dictionary<string, string> UserProperties
+        {
             get { return userProperties; }
             set { this.userProperties = value; }
         }
 
         private Dictionary<string, string> systemProperties;
-        internal Dictionary<string, string> SystemProperties {
+        internal Dictionary<string, string> SystemProperties
+        {
             get { return systemProperties; }
             set { this.systemProperties = value; }
         }
diff --git a/rocketmq-client-csharp/MessageType.cs b/rocketmq-client-csharp/MessageType.cs
index 8373496..a459e93 100644
--- a/rocketmq-client-csharp/MessageType.cs
+++ b/rocketmq-client-csharp/MessageType.cs
@@ -17,7 +17,8 @@
 
 namespace Org.Apache.Rocketmq
 {
-    public enum MessageType {
+    public enum MessageType
+    {
         Normal,
         Fifo,
         Delay,
diff --git a/rocketmq-client-csharp/MetadataConstants.cs b/rocketmq-client-csharp/MetadataConstants.cs
index 33bd66e..5381595 100644
--- a/rocketmq-client-csharp/MetadataConstants.cs
+++ b/rocketmq-client-csharp/MetadataConstants.cs
@@ -17,8 +17,10 @@
 
 using System;
 
-namespace Org.Apache.Rocketmq {
-    public class MetadataConstants {
+namespace Org.Apache.Rocketmq
+{
+    public class MetadataConstants
+    {
         public const string TENANT_ID_KEY = "x-mq-tenant-id";
         public const string NAMESPACE_KEY = "x-mq-namespace";
         public const string AUTHORIZATION = "authorization";
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index ccddc8c..bfcc1d3 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -139,6 +139,6 @@ namespace Org.Apache.Rocketmq
         }
 
         private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
-        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+        private static new readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index def9be4..909e7a2 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -201,7 +201,7 @@ namespace Org.Apache.Rocketmq
                         }
                     }
                 }
-                catch (System.Exception e)
+                catch (System.Exception)
                 {
                     // TODO: log exception raised.
                 }
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index f56a07f..e0a2caf 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -125,7 +125,8 @@ namespace Org.Apache.Rocketmq
             var callOptions = new CallOptions(metadata, deadline);
             var call = _stub.ReceiveMessage(request, callOptions);
             var result = new List<rmq::ReceiveMessageResponse>();
-            while(await call.ResponseStream.MoveNext()) {
+            while (await call.ResponseStream.MoveNext())
+            {
                 result.Add(call.ResponseStream.Current);
             }
             return result;
diff --git a/rocketmq-client-csharp/SendStatus.cs b/rocketmq-client-csharp/SendStatus.cs
index b20e1c5..7586d22 100644
--- a/rocketmq-client-csharp/SendStatus.cs
+++ b/rocketmq-client-csharp/SendStatus.cs
@@ -15,8 +15,10 @@
  * limitations under the License.
  */
 
-namespace Org.Apache.Rocketmq {
-    public enum SendStatus {
+namespace Org.Apache.Rocketmq
+{
+    public enum SendStatus
+    {
         SEND_OK,
         FLUSH_DISK_TIMEOUT,
         FLUSH_SLAVE_TIMEOUT,
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index dc13dc9..3e234f2 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -33,6 +33,7 @@ namespace Org.Apache.Rocketmq
         {
             this._target = target;
             this._stream = stream;
+            this._client = client;
         }
 
         public async Task Loop()
@@ -41,36 +42,40 @@ namespace Org.Apache.Rocketmq
             var writer = this._stream.RequestStream;
             var request = new rmq::TelemetryCommand();
             request.Settings = new rmq::Settings();
-            _client.buildClientSetting(request.Settings);
+            _client.BuildClientSetting(request.Settings);
             await writer.WriteAsync(request);
+            Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
             while (!_cts.IsCancellationRequested)
             {
                 if (await reader.MoveNext(_cts.Token))
                 {
                     var cmd = reader.Current;
+                    Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
                     switch (cmd.CommandCase)
                     {
                         case rmq::TelemetryCommand.CommandOneofCase.None:
-                        {
-                            Logger.Warn($"Telemetry failed: {cmd.Status}");
-                            break;
-                        }
+                            {
+                                Logger.Warn($"Telemetry failed: {cmd.Status}");
+                                break;
+                            }
                         case rmq::TelemetryCommand.CommandOneofCase.Settings:
-                        {
-                            break;
-                        }
+                            {
+
+                                Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+                                break;
+                            }
                         case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
-                        {
-                            break;
-                        }
+                            {
+                                break;
+                            }
                         case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
-                        {
-                            break;
-                        }
+                            {
+                                break;
+                            }
                         case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand:
-                        {
-                            break;
-                        }
+                            {
+                                break;
+                            }
                     }
                 }
             }
diff --git a/rocketmq-client-csharp/Signature.cs b/rocketmq-client-csharp/Signature.cs
index c249253..2331b53 100644
--- a/rocketmq-client-csharp/Signature.cs
+++ b/rocketmq-client-csharp/Signature.cs
@@ -19,30 +19,38 @@ using System.Text;
 using grpc = global::Grpc.Core;
 using System.Security.Cryptography;
 
-namespace Org.Apache.Rocketmq {
-    public class Signature {
-        public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) {
+namespace Org.Apache.Rocketmq
+{
+    public class Signature
+    {
+        public static void sign(IClientConfig clientConfig, grpc::Metadata metadata)
+        {
             metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
             metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
             metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId());
-            if (!String.IsNullOrEmpty(clientConfig.tenantId())) {
+            if (!String.IsNullOrEmpty(clientConfig.tenantId()))
+            {
                 metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
             }
 
-            if (!String.IsNullOrEmpty(clientConfig.resourceNamespace())) {
+            if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
+            {
                 metadata.Add(MetadataConstants.NAMESPACE_KEY, clientConfig.resourceNamespace());
             }
 
             string time = DateTime.Now.ToString(MetadataConstants.DATE_TIME_FORMAT);
             metadata.Add(MetadataConstants.DATE_TIME_KEY, time);
 
-            if (null != clientConfig.credentialsProvider()) {
+            if (null != clientConfig.credentialsProvider())
+            {
                 var credentials = clientConfig.credentialsProvider().getCredentials();
-                if (null == credentials || credentials.expired()) {
+                if (null == credentials || credentials.expired())
+                {
                     return;
                 }
 
-                if (!String.IsNullOrEmpty(credentials.SessionToken)) {
+                if (!String.IsNullOrEmpty(credentials.SessionToken))
+                {
                     metadata.Add(MetadataConstants.STS_SESSION_TOKEN, credentials.SessionToken);
                 }
 
@@ -51,7 +59,7 @@ namespace Org.Apache.Rocketmq {
                 HMACSHA1 signer = new HMACSHA1(secretData);
                 byte[] digest = signer.ComputeHash(data);
                 string hmac = BitConverter.ToString(digest).Replace("-", "");
-                string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}", 
+                string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}",
                     MetadataConstants.ALGORITHM_KEY,
                     MetadataConstants.CREDENTIAL_KEY,
                     credentials.AccessKey,
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
new file mode 100644
index 0000000..4c447c9
--- /dev/null
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using rmq = Apache.Rocketmq.V2;
+using NLog;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace Org.Apache.Rocketmq
+{
+    public class SimpleConsumer : Client
+    {
+
+        public SimpleConsumer(AccessPoint accessPoint,
+        string resourceNamespace, string group)
+        : base(accessPoint, resourceNamespace)
+        {
+            fifo_ = false;
+            subscriptions_ = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
+            group_ = group;
+        }
+
+        public override void BuildClientSetting(rmq::Settings settings)
+        {
+            base.BuildClientSetting(settings);
+
+            settings.ClientType = rmq::ClientType.SimpleConsumer;
+            settings.Subscription = new rmq::Subscription();
+            settings.Subscription.Group = new rmq::Resource();
+            settings.Subscription.Group.Name = Group;
+            settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+
+            foreach (var entry in subscriptions_)
+            {
+                settings.Subscription.Subscriptions.Add(entry.Value);
+            }
+        }
+
+        public override void Start()
+        {
+            base.Start();
+            base.createSession(_accessPoint.TargetUrl());
+        }
+
+        public override void Shutdown()
+        {
+            base.Shutdown();
+        }
+
+        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+        {
+        }
+
+        public void Subscribe(string topic, rmq::FilterType filterType, string expression)
+        {
+            var entry = new rmq::SubscriptionEntry();
+            entry.Topic = new rmq::Resource();
+            entry.Topic.Name = topic;
+            entry.Topic.ResourceNamespace = ResourceNamespace;
+            entry.Expression = new rmq::FilterExpression();
+            entry.Expression.Type = filterType;
+            entry.Expression.Expression = expression;
+            subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
+        }
+
+        private string group_;
+
+        public string Group
+        {
+            get { return group_; }
+        }
+
+        private bool fifo_;
+
+        private ConcurrentDictionary<string, rmq::SubscriptionEntry> subscriptions_;
+
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticCredentialsProvider.cs b/rocketmq-client-csharp/StaticCredentialsProvider.cs
index d00dba6..edd810d 100644
--- a/rocketmq-client-csharp/StaticCredentialsProvider.cs
+++ b/rocketmq-client-csharp/StaticCredentialsProvider.cs
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace Org.Apache.Rocketmq {
-    public class StaticCredentialsProvider : ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+    public class StaticCredentialsProvider : ICredentialsProvider
+    {
 
-        public StaticCredentialsProvider(string accessKey, string accessSecret) {
+        public StaticCredentialsProvider(string accessKey, string accessSecret)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
         }
 
-        public Credentials getCredentials() {
+        public Credentials getCredentials()
+        {
             return new Credentials(accessKey, accessSecret);
         }
 
diff --git a/tests/ClientConfigTest.cs b/tests/ClientConfigTest.cs
index 427d1d2..4d8dec1 100644
--- a/tests/ClientConfigTest.cs
+++ b/tests/ClientConfigTest.cs
@@ -17,11 +17,14 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class ClientConfigTest {
+    public class ClientConfigTest
+    {
         [TestMethod]
-        public void testClientId() {
+        public void testClientId()
+        {
             var clientConfig = new ClientConfig();
             string clientId = clientConfig.clientId();
             Assert.IsTrue(clientId.Contains("@"));
diff --git a/tests/ClientManagerTest.cs b/tests/ClientManagerTest.cs
index 850db63..af5983c 100644
--- a/tests/ClientManagerTest.cs
+++ b/tests/ClientManagerTest.cs
@@ -19,13 +19,16 @@ using Grpc.Core;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using rmq = Apache.Rocketmq.V2;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class ClientManagerTest {
-        
+    public class ClientManagerTest
+    {
+
         [TestMethod]
-        public void TestResolveRoute() {
+        public void TestResolveRoute()
+        {
             string topic = "cpp_sdk_standard";
             string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
             var request = new rmq::QueryRouteRequest();
diff --git a/tests/ConfigFileCredentialsProviderTest.cs b/tests/ConfigFileCredentialsProviderTest.cs
index 0d46b98..7741295 100644
--- a/tests/ConfigFileCredentialsProviderTest.cs
+++ b/tests/ConfigFileCredentialsProviderTest.cs
@@ -18,11 +18,14 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class ConfigFileCredentialsProviderTest {
+    public class ConfigFileCredentialsProviderTest
+    {
         [TestMethod]
-        public void testGetCredentials() {
+        public void testGetCredentials()
+        {
             var provider = new ConfigFileCredentialsProvider();
             var credentials = provider.getCredentials();
             Assert.IsNotNull(credentials);
diff --git a/tests/DateTimeTest.cs b/tests/DateTimeTest.cs
index 0d9a2a7..fdf7d53 100644
--- a/tests/DateTimeTest.cs
+++ b/tests/DateTimeTest.cs
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace Org.Apache.Rocketmq {
-    
+namespace Org.Apache.Rocketmq
+{
+
     [TestClass]
-    public class DateTimeTest {
-        
+    public class DateTimeTest
+    {
+
         [TestMethod]
-        public void testFormat() {
+        public void testFormat()
+        {
             DateTime instant = new DateTime(2022, 02, 15, 08, 31, 56);
-            string time =  instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
+            string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
             string expected = "20220215T083156Z";
             Assert.AreEqual(time, expected);
         }
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
index 2de9f54..f1c71f8 100644
--- a/tests/MessageTest.cs
+++ b/tests/MessageTest.cs
@@ -19,12 +19,15 @@ using System;
 using System.Text;
 using System.Collections.Generic;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class MessageTest {
+    public class MessageTest
+    {
 
         [TestMethod]
-        public void testCtor() {
+        public void testCtor()
+        {
             var msg1 = new Message();
             Assert.IsNotNull(msg1.MessageId);
             Assert.IsTrue(msg1.MessageId.StartsWith("01"));
@@ -36,7 +39,8 @@ namespace Org.Apache.Rocketmq {
         }
 
         [TestMethod]
-        public void testCtor2() {
+        public void testCtor2()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -49,7 +53,8 @@ namespace Org.Apache.Rocketmq {
         }
 
         [TestMethod]
-        public void testCtor3() {
+        public void testCtor3()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -63,7 +68,8 @@ namespace Org.Apache.Rocketmq {
         }
 
         [TestMethod]
-        public void testCtor4() {
+        public void testCtor4()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -81,7 +87,8 @@ namespace Org.Apache.Rocketmq {
         }
 
         [TestMethod]
-        public void testCtor5() {
+        public void testCtor5()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
diff --git a/tests/PushConsumerTest.cs b/tests/PushConsumerTest.cs
index 444530b..78f01de 100644
--- a/tests/PushConsumerTest.cs
+++ b/tests/PushConsumerTest.cs
@@ -24,26 +24,28 @@ namespace Org.Apache.Rocketmq
 
     public class TestMessageListener : IMessageListener
     {
-        public async Task Consume(List<Message> messages, List<Message> failed)
+        public Task Consume(List<Message> messages, List<Message> failed)
         {
             foreach (var message in messages)
             {
                 Console.WriteLine("");
             }
-        }
 
+            return Task.CompletedTask;
+        }
     }
 
     public class CountableMessageListener : IMessageListener
     {
-        public async Task Consume(List<Message> messages, List<Message> failed)
+        public Task Consume(List<Message> messages, List<Message> failed)
         {
             foreach (var message in messages)
             {
                 Console.WriteLine("{}", message.MessageId);
             }
-        }
 
+            return Task.CompletedTask;
+        }
     }
 
     [TestClass]
diff --git a/tests/SendResultTest.cs b/tests/SendResultTest.cs
index 475cf6d..4e3d9a0 100644
--- a/tests/SendResultTest.cs
+++ b/tests/SendResultTest.cs
@@ -17,13 +17,16 @@
 
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class SendResultTest {
+    public class SendResultTest
+    {
 
         [TestMethod]
-        public void testCtor() {
+        public void testCtor()
+        {
             string messageId = new string("abc");
             var sendResult = new SendReceipt(messageId);
             Assert.AreEqual(messageId, sendResult.MessageId);
@@ -32,7 +35,8 @@ namespace Org.Apache.Rocketmq {
 
 
         [TestMethod]
-        public void testCtor2() {
+        public void testCtor2()
+        {
             string messageId = new string("abc");
             var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
             Assert.AreEqual(messageId, sendResult.MessageId);
@@ -40,5 +44,5 @@ namespace Org.Apache.Rocketmq {
         }
 
     }
-    
+
 }
\ No newline at end of file
diff --git a/tests/SignatureTest.cs b/tests/SignatureTest.cs
index fd6b525..16d0f46 100644
--- a/tests/SignatureTest.cs
+++ b/tests/SignatureTest.cs
@@ -19,10 +19,12 @@ using grpc = global::Grpc.Core;
 using Moq;
 using System;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class SignatureTest {
+    public class SignatureTest
+    {
 
         [TestMethod]
         public void testSign()
@@ -33,7 +35,7 @@ namespace Org.Apache.Rocketmq {
             mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
             mock.Setup(x => x.serviceName()).Returns("mq");
             mock.Setup(x => x.region()).Returns("cn-hangzhou");
-            
+
             string accessKey = "key";
             string accessSecret = "secret";
             var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/SendResultTest.cs b/tests/SimpleConsumerTest.cs
similarity index 55%
copy from tests/SendResultTest.cs
copy to tests/SimpleConsumerTest.cs
index 475cf6d..1bc1a45 100644
--- a/tests/SendResultTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -14,31 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
+using System.Threading;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
+using rmq = Apache.Rocketmq.V2;
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class SendResultTest {
+    public class SimpleConsumerTest
+    {
 
         [TestMethod]
-        public void testCtor() {
-            string messageId = new string("abc");
-            var sendResult = new SendReceipt(messageId);
-            Assert.AreEqual(messageId, sendResult.MessageId);
-            Assert.AreEqual(SendStatus.SEND_OK, sendResult.Status);
-        }
-
+        public void TestStart()
+        {
+            var accessPoint = new AccessPoint();
+            var host = "11.166.42.94";
+            var port = 8081;
+            accessPoint.Host = host;
+            accessPoint.Port = port;
+            var resourceNamespace = "";
+            var group = "GID_cpp_sdk_standard";
+            var topic = "cpp_sdk_standard";
 
-        [TestMethod]
-        public void testCtor2() {
-            string messageId = new string("abc");
-            var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
-            Assert.AreEqual(messageId, sendResult.MessageId);
-            Assert.AreEqual(SendStatus.FLUSH_DISK_TIMEOUT, sendResult.Status);
+            var simpleConsumer = new SimpleConsumer(accessPoint, resourceNamespace, group);
+            simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
+            simpleConsumer.Start();
+            Thread.Sleep(10_000);
         }
 
     }
-    
+
+
 }
\ No newline at end of file
diff --git a/tests/StaticCredentialsProviderTest.cs b/tests/StaticCredentialsProviderTest.cs
index 20b9450..8b5f012 100644
--- a/tests/StaticCredentialsProviderTest.cs
+++ b/tests/StaticCredentialsProviderTest.cs
@@ -17,12 +17,15 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
 
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class StaticCredentialsProviderTest {
+    public class StaticCredentialsProviderTest
+    {
 
         [TestMethod]
-        public void testGetCredentials() {
+        public void testGetCredentials()
+        {
             var accessKey = "key";
             var accessSecret = "secret";
             var provider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/TopicTest.cs b/tests/TopicTest.cs
index 7d9f3f4..9f386de 100644
--- a/tests/TopicTest.cs
+++ b/tests/TopicTest.cs
@@ -17,13 +17,16 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System.Collections.Generic;
 
-namespace Org.Apache.Rocketmq {
-     
-     [TestClass]
-     public class TopicTest {
+namespace Org.Apache.Rocketmq
+{
 
-         [TestMethod]
-         public void testCompareTo() {
+    [TestClass]
+    public class TopicTest
+    {
+
+        [TestMethod]
+        public void testCompareTo()
+        {
             List<Topic> topics = new List<Topic>();
             topics.Add(new Topic("ns1", "t1"));
             topics.Add(new Topic("ns0", "t1"));
@@ -36,13 +39,13 @@ namespace Org.Apache.Rocketmq {
 
             Assert.AreEqual(topics[1].ResourceNamespace, "ns0");
             Assert.AreEqual(topics[1].Name, "t1");
-            
+
 
             Assert.AreEqual(topics[2].ResourceNamespace, "ns1");
             Assert.AreEqual(topics[2].Name, "t1");
-            
+
         }
 
 
-     }
- }
\ No newline at end of file
+    }
+}
\ No newline at end of file
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index c0b9357..bbf537a 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -16,36 +16,38 @@ namespace tests
         public void TestMethod1()
         {
             rmq::Permission perm = rmq::Permission.None;
-            switch(perm) {
-                case rmq::Permission.None:
-                {
-                    Console.WriteLine("None");
-                    break;
-                }
+            switch (perm)
+            {
+                case rmq::Permission.None:
+                    {
+                        Console.WriteLine("None");
+                        break;
+                    }
 
-                case rmq::Permission.Read:
-                {
-                    Console.WriteLine("Read");
-                    break;
-                }
+                case rmq::Permission.Read:
+                    {
+                        Console.WriteLine("Read");
+                        break;
+                    }
 
-                case rmq::Permission.Write:
-                {
-                    Console.WriteLine("Write");
-                    break;
-                }
+                case rmq::Permission.Write:
+                    {
+                        Console.WriteLine("Write");
+                        break;
+                    }
 
-                case rmq::Permission.ReadWrite:
-                {
-                    Console.WriteLine("ReadWrite");
-                    break;
-                }
+                case rmq::Permission.ReadWrite:
+                    {
+                        Console.WriteLine("ReadWrite");
+                        break;
+                    }
 
             }
         }
 
         [TestMethod]
-        public void TestRpcClientImplCtor() {
+        public void TestRpcClientImplCtor()
+        {
             RpcClient impl = new RpcClient("https://localhost:5001");
         }
 


[rocketmq-client-csharp] 03/04: WIP

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git

commit 86083310e94f41a27d470f6b8d63b27394e63168
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 16:44:26 2022 +0800

    WIP
---
 rocketmq-client-csharp/Client.cs         | 15 +++++++++++++++
 rocketmq-client-csharp/IClient.cs        |  1 +
 rocketmq-client-csharp/Producer.cs       |  2 ++
 rocketmq-client-csharp/Session.cs        |  2 +-
 rocketmq-client-csharp/SimpleConsumer.cs | 16 ++++++++++++++++
 5 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 1eb368e..607daf4 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -398,6 +398,21 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
+        public virtual void OnReceive(rmq::Settings settings)
+        {
+            if (null != settings.Metric)
+            {
+                _clientSettings.Metric = new rmq::Metric();
+                _clientSettings.Metric.MergeFrom(settings.Metric);
+            }
+
+            if (null != settings.BackoffPolicy)
+            {
+                _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
+                _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+            }
+        }
+
         protected readonly IClientManager Manager;
 
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index abdcc21..4b7206b 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -31,5 +31,6 @@ namespace Org.Apache.Rocketmq
         void BuildClientSetting(rmq::Settings settings);
 
 
+        void OnReceive(rmq::Settings settings);
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index bfcc1d3..32606ae 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -47,7 +47,9 @@ namespace Org.Apache.Rocketmq
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
+            request.ClientType = rmq::ClientType.Producer;
 
+            // Concept of ProducerGroup has been removed.
         }
 
         public async Task<SendReceipt> Send(Message message)
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index 3e234f2..f5e7795 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -60,8 +60,8 @@ namespace Org.Apache.Rocketmq
                             }
                         case rmq::TelemetryCommand.CommandOneofCase.Settings:
                             {
-
                                 Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+                                _client.OnReceive(cmd.Settings);
                                 break;
                             }
                         case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 4c447c9..9eaf365 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -19,6 +19,7 @@ using rmq = Apache.Rocketmq.V2;
 using NLog;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
+using Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
@@ -63,6 +64,10 @@ namespace Org.Apache.Rocketmq
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
+            request.ClientType = rmq::ClientType.SimpleConsumer;
+            request.Group = new rmq::Resource();
+            request.Group.Name = Group;
+            request.Group.ResourceNamespace = ResourceNamespace;
         }
 
         public void Subscribe(string topic, rmq::FilterType filterType, string expression)
@@ -77,6 +82,17 @@ namespace Org.Apache.Rocketmq
             subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
         }
 
+        public override void OnReceive(Settings settings)
+        {
+            base.OnReceive(settings);
+
+            if (settings.Subscription.Fifo)
+            {
+                fifo_ = true;
+            }
+
+        }
+
         private string group_;
 
         public string Group


[rocketmq-client-csharp] 04/04: Make Shutdown async

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git

commit a6b1ee9e0525e004a01fa78148da8cf13f3526a5
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 17:04:44 2022 +0800

    Make Shutdown async
---
 rocketmq-client-csharp/Client.cs         |  6 +++---
 rocketmq-client-csharp/IClient.cs        |  2 +-
 rocketmq-client-csharp/IConsumer.cs      |  4 +++-
 rocketmq-client-csharp/IProducer.cs      |  2 +-
 rocketmq-client-csharp/Producer.cs       |  4 ++--
 rocketmq-client-csharp/PushConsumer.cs   |  4 ++--
 rocketmq-client-csharp/Session.cs        |  2 +-
 rocketmq-client-csharp/SimpleConsumer.cs | 13 +++++++++----
 tests/ProducerTest.cs                    |  2 +-
 tests/SimpleConsumerTest.cs              |  4 +++-
 10 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 607daf4..28ef42b 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -75,11 +75,11 @@ namespace Org.Apache.Rocketmq
             }, 30, _updateTopicRouteCts.Token);
         }
 
-        public virtual void Shutdown()
+        public virtual async Task Shutdown()
         {
             Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
             _updateTopicRouteCts.Cancel();
-            Manager.Shutdown().GetAwaiter().GetResult();
+            await Manager.Shutdown();
         }
 
         protected string FilterBroker(Func<string, bool> acceptor)
@@ -398,7 +398,7 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
-        public virtual void OnReceive(rmq::Settings settings)
+        public virtual void OnSettingsReceived(rmq::Settings settings)
         {
             if (null != settings.Metric)
             {
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 4b7206b..461c452 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -31,6 +31,6 @@ namespace Org.Apache.Rocketmq
         void BuildClientSetting(rmq::Settings settings);
 
 
-        void OnReceive(rmq::Settings settings);
+        void OnSettingsReceived(rmq::Settings settings);
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IConsumer.cs b/rocketmq-client-csharp/IConsumer.cs
index ac4d787..de27f1f 100644
--- a/rocketmq-client-csharp/IConsumer.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System.Threading.Tasks;
 namespace Org.Apache.Rocketmq
 {
     public interface IConsumer
     {
         void Start();
 
-        void Shutdown();
+        Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 9c30c6c..088df5e 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -23,7 +23,7 @@ namespace Org.Apache.Rocketmq
     {
         void Start();
 
-        void Shutdown();
+        Task Shutdown();
 
         Task<SendReceipt> Send(Message message);
     }
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 32606ae..4a39f33 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -39,10 +39,10 @@ namespace Org.Apache.Rocketmq
             // More initialization
         }
 
-        public override void Shutdown()
+        public override async Task Shutdown()
         {
             // Release local resources
-            base.Shutdown();
+            await base.Shutdown();
         }
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index 909e7a2..3b37950 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -66,13 +66,13 @@ namespace Org.Apache.Rocketmq
             }, 10, _scanExpiredProcessQueueCTS.Token);
         }
 
-        public override void Shutdown()
+        public override async Task Shutdown()
         {
             _scanAssignmentCTS.Cancel();
             _scanExpiredProcessQueueCTS.Cancel();
 
             // Shutdown resources of derived class
-            base.Shutdown();
+            await base.Shutdown();
         }
 
         private async Task scanLoadAssignments()
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index f5e7795..51eb09c 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -61,7 +61,7 @@ namespace Org.Apache.Rocketmq
                         case rmq::TelemetryCommand.CommandOneofCase.Settings:
                             {
                                 Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
-                                _client.OnReceive(cmd.Settings);
+                                _client.OnSettingsReceived(cmd.Settings);
                                 break;
                             }
                         case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 9eaf365..afd447a 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -17,6 +17,7 @@
 
 using rmq = Apache.Rocketmq.V2;
 using NLog;
+using System.Threading.Tasks;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 using Apache.Rocketmq.V2;
@@ -57,9 +58,13 @@ namespace Org.Apache.Rocketmq
             base.createSession(_accessPoint.TargetUrl());
         }
 
-        public override void Shutdown()
+        public override async Task Shutdown()
         {
-            base.Shutdown();
+            await base.Shutdown();
+            if (!await NotifyClientTermination())
+            {
+                Logger.Warn("Failed to NotifyClientTermination");
+            }
         }
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
@@ -82,9 +87,9 @@ namespace Org.Apache.Rocketmq
             subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
         }
 
-        public override void OnReceive(Settings settings)
+        public override void OnSettingsReceived(Settings settings)
         {
-            base.OnReceive(settings);
+            base.OnSettingsReceived(settings);
 
             if (settings.Subscription.Fifo)
             {
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index a6746ff..baeca17 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -52,7 +52,7 @@ namespace Org.Apache.Rocketmq
             var msg = new Message(topic, body);
             var sendResult = await producer.Send(msg);
             Assert.IsNotNull(sendResult);
-            producer.Shutdown();
+            await producer.Shutdown();
         }
 
         private static string resourceNamespace = "";
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
index 1bc1a45..29f155f 100644
--- a/tests/SimpleConsumerTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -17,6 +17,7 @@
 using System.Threading;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
 
 namespace Org.Apache.Rocketmq
 {
@@ -26,7 +27,7 @@ namespace Org.Apache.Rocketmq
     {
 
         [TestMethod]
-        public void TestStart()
+        public async Task TestStart()
         {
             var accessPoint = new AccessPoint();
             var host = "11.166.42.94";
@@ -41,6 +42,7 @@ namespace Org.Apache.Rocketmq
             simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
             simpleConsumer.Start();
             Thread.Sleep(10_000);
+            await simpleConsumer.Shutdown();
         }
 
     }