You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/02/18 07:05:45 UTC

[rocketmq-client-csharp] branch develop updated: Implement and Test RPC QueryRoute, Heartbeat and Implement Producer basic logic (#2)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/develop by this push:
     new cbe22d2  Implement and Test RPC QueryRoute, Heartbeat and Implement Producer basic logic (#2)
cbe22d2 is described below

commit cbe22d2546118b1282bd0fe98ba82fe88323e2b2
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Fri Feb 18 15:05:05 2022 +0800

    Implement and Test RPC QueryRoute, Heartbeat and Implement Producer basic logic (#2)
---
 README.md                                          |   2 +-
 rocketmq-client-csharp/Broker.cs                   |   9 +
 rocketmq-client-csharp/Client.cs                   | 261 +++++++++++++++++++++
 rocketmq-client-csharp/ClientLoggerInterceptor.cs  | 134 +++++++++++
 rocketmq-client-csharp/ClientManager.cs            |  88 ++++++-
 .../{IRpcClient.cs => ClientManagerFactory.cs}     |  74 +++---
 .../ConfigFileCredentialsProvider.cs               |  63 +++++
 .../{IRpcClient.cs => IClient.cs}                  |  61 ++---
 rocketmq-client-csharp/IClientManager.cs           |   6 +
 .../{IRpcClient.cs => INameServerResolver.cs}      |  56 +++--
 .../{IClientManager.cs => IProducer.cs}            |  13 +-
 rocketmq-client-csharp/IRpcClient.cs               |   7 +-
 rocketmq-client-csharp/Message.cs                  |  85 +++++++
 .../{IRpcClient.cs => MessageType.cs}              |  57 +++--
 rocketmq-client-csharp/Producer.cs                 | 135 +++++++++++
 rocketmq-client-csharp/PublishLoadBalancer.cs      | 119 ++++++++++
 rocketmq-client-csharp/RpcClient.cs                |  23 +-
 rocketmq-client-csharp/Signature.cs                |   2 +-
 .../{IRpcClient.cs => StaticNameServerResolver.cs} |  67 +++---
 .../{IRpcClient.cs => TopicRouteException.cs}      |  57 +++--
 tests/ClientManagerTest.cs                         |  57 +++++
 .../ConfigFileCredentialsProviderTest.cs           |  17 +-
 tests/MessageTest.cs                               | 112 +++++++++
 tests/RpcClientTest.cs                             | 113 +++++++++
 .../StaticNameServerResolverTest.cs                |  64 ++---
 25 files changed, 1447 insertions(+), 235 deletions(-)

diff --git a/README.md b/README.md
index 8191957..7c8935d 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@ dotnet build
 
 #### Run Unit Tests
 ```sh
-dotnet test
+dotnet test -l "console;verbosity=detailed"
 ```
 
 #### Run Examples
diff --git a/rocketmq-client-csharp/Broker.cs b/rocketmq-client-csharp/Broker.cs
index e909bf7..2f5f675 100644
--- a/rocketmq-client-csharp/Broker.cs
+++ b/rocketmq-client-csharp/Broker.cs
@@ -41,6 +41,15 @@ namespace org.apache.rocketmq {
             get { return address; }
         }
 
+        /**
+         * Context aware primary target URL.
+         */
+        public string targetUrl()
+        {
+            var addr = address.Addresses[0];
+            return string.Format("https://{0}:{1}", addr.Host, addr.Port);
+        }
+
         public int CompareTo(Broker other) {
             if (0 != name.CompareTo(other.name)) {
                 return name.CompareTo(other.name);
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
new file mode 100644
index 0000000..e3cb547
--- /dev/null
+++ b/rocketmq-client-csharp/Client.cs
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Threading.Tasks;
+using System.Threading;
+using System;
+using rmq = apache.rocketmq.v1;
+using grpc = global::Grpc.Core;
+
+
+namespace org.apache.rocketmq
+{
+    public abstract class Client : ClientConfig, IClient
+    {
+
+        public Client(INameServerResolver resolver)
+        {
+            this.nameServerResolver = resolver;
+            this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace());
+            this.nameServerResolverCTS = new CancellationTokenSource();
+
+            this.topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
+            this.updateTopicRouteCTS = new CancellationTokenSource();
+        }
+
+        public virtual void start()
+        {
+            schedule(async () =>
+            {
+                await updateNameServerList();
+            }, 30, nameServerResolverCTS.Token);
+
+            schedule(async () =>
+            {
+                await updateTopicRoute();
+
+            }, 30, updateTopicRouteCTS.Token);
+
+        }
+
+        public virtual void shutdown()
+        {
+            updateTopicRouteCTS.Cancel();
+            nameServerResolverCTS.Cancel();
+        }
+
+        private async Task updateNameServerList()
+        {
+            List<string> nameServers = await nameServerResolver.resolveAsync();
+            if (0 == nameServers.Count)
+            {
+                // Whoops, something should be wrong. We got an empty name server list.
+                return;
+            }
+
+            if (nameServers.Equals(this.nameServers))
+            {
+                return;
+            }
+
+            // Name server list is updated. 
+            // TODO: Locking is required
+            this.nameServers = nameServers;
+            this.currentNameServerIndex = 0;
+        }
+
+        private async Task updateTopicRoute()
+        {
+            if (null == nameServers || 0 == nameServers.Count)
+            {
+                List<string> list = await nameServerResolver.resolveAsync();
+                if (null != list && 0 != list.Count)
+                {
+                    this.nameServers = list;
+                }
+                else
+                {
+                    // TODO: log warning here.
+                    return;
+                }
+            }
+
+            // We got one or more name servers available.
+            string nameServer = nameServers[currentNameServerIndex];
+
+            List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+            foreach (var item in topicRouteTable)
+            {
+                tasks.Add(getRouteFor(item.Key, true));
+            }
+
+            // Update topic route data
+            TopicRouteData[] result = await Task.WhenAll(tasks);
+            foreach (var item in result)
+            {
+                if (null == item)
+                {
+                    continue;
+                }
+
+                if (0 == item.Partitions.Count)
+                {
+                    continue;
+                }
+
+                var topicName = item.Partitions[0].Topic.Name;
+                var existing = topicRouteTable[topicName];
+                if (!existing.Equals(item))
+                {
+                    topicRouteTable[topicName] = item;
+                }
+            }
+        }
+
+        public void schedule(Action action, int seconds, CancellationToken token)
+        {
+            if (null == action)
+            {
+                // TODO: log warning
+                return;
+            }
+
+            Task.Run(async () =>
+            {
+                while (!token.IsCancellationRequested)
+                {
+                    action();
+                    await Task.Delay(TimeSpan.FromSeconds(seconds), token);
+                }
+            });
+        }
+
+        /**
+         * Parameters:
+         * topic
+         *    Topic to query
+         * direct
+         *    Indicate if we should by-pass cache and fetch route entries from name server.
+         */
+        public async Task<TopicRouteData> getRouteFor(string topic, bool direct)
+        {
+            if (!direct && topicRouteTable.ContainsKey(topic))
+            {
+                return topicRouteTable[topic];
+            }
+
+            if (null == nameServers || 0 == nameServers.Count)
+            {
+                List<string> list = await nameServerResolver.resolveAsync();
+                if (null != list && 0 != list.Count)
+                {
+                    this.nameServers = list;
+                }
+                else
+                {
+                    // TODO: log warning here.
+                    return null;
+                }
+            }
+
+            // We got one or more name servers available.
+            string nameServer = nameServers[currentNameServerIndex];
+            var metadata = new grpc.Metadata();
+            Signature.sign(this, metadata);
+            var request = new rmq::QueryRouteRequest();
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = resourceNamespace();
+            request.Topic.Name = topic;
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            string[] segments = nameServer.Split(":");
+            address.Host = segments[0];
+            address.Port = Int32.Parse(segments[1]);
+            request.Endpoints.Addresses.Add(address);
+            var target = string.Format("https://{0}:{1}", segments[0], segments[1]);
+            var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
+            return topicRouteData;
+        }
+
+        public abstract void prepareHeartbeatData(rmq::HeartbeatRequest request);
+
+        public void heartbeat()
+        {
+            List<string> endpoints = endpointsInUse();
+            if (0 == endpoints.Count)
+            {
+                return;
+            }
+
+            var heartbeatRequest = new rmq::HeartbeatRequest();
+            prepareHeartbeatData(heartbeatRequest);
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+        }
+
+        public void healthCheck()
+        {
+
+        }
+
+        public async Task<bool> notifyClientTermination()
+        {
+            List<string> endpoints = endpointsInUse();
+            var request = new rmq::NotifyClientTerminationRequest();
+            request.ClientId = clientId();
+
+            var metadata = new grpc.Metadata();
+            Signature.sign(this, metadata);
+
+            List<Task<Boolean>> tasks = new List<Task<Boolean>>();
+
+            foreach (var endpoint in endpoints)
+            {
+                tasks.Add(clientManager.notifyClientTermination(endpoint, metadata, request, getIoTimeout()));
+            }
+
+            bool[] results = await Task.WhenAll(tasks);
+            foreach (bool b in results)
+            {
+                if (!b)
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        private List<string> endpointsInUse()
+        {
+            //TODO: gather endpoints from route entries.
+            return new List<string>();
+        }
+
+        protected IClientManager clientManager;
+        private INameServerResolver nameServerResolver;
+        private CancellationTokenSource nameServerResolverCTS;
+        private List<string> nameServers;
+        private int currentNameServerIndex;
+
+        private ConcurrentDictionary<string, TopicRouteData> topicRouteTable;
+        private CancellationTokenSource updateTopicRouteCTS;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
new file mode 100644
index 0000000..59ec0f2
--- /dev/null
+++ b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+
+namespace org.apache.rocketmq
+{
+    public class ClientLoggerInterceptor : Interceptor
+    {
+        public override TResponse BlockingUnaryCall<TRequest, TResponse>(
+            TRequest request,
+            ClientInterceptorContext<TRequest, TResponse> context,
+            BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
+        {
+            LogCall(context.Method);
+            AddCallerMetadata(ref context);
+
+            return continuation(request, context);
+        }
+
+        public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
+            TRequest request,
+            ClientInterceptorContext<TRequest, TResponse> context,
+            AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
+        {
+            LogCall(context.Method);
+            AddCallerMetadata(ref context);
+
+            var call = continuation(request, context);
+
+            return new AsyncUnaryCall<TResponse>(HandleResponse(call.ResponseAsync), call.ResponseHeadersAsync, call.GetStatus, call.GetTrailers, call.Dispose);
+        }
+
+        private async Task<TResponse> HandleResponse<TResponse>(Task<TResponse> t)
+        {
+            try
+            {
+                var response = await t;
+                Console.WriteLine($"Response received: {response}");
+                return response;
+            }
+            catch (Exception ex)
+            {
+                // Log error to the console.
+                // Note: Configuring .NET Core logging is the recommended way to log errors
+                // https://docs.microsoft.com/aspnet/core/grpc/diagnostics#grpc-client-logging
+                var initialColor = Console.ForegroundColor;
+                Console.ForegroundColor = ConsoleColor.Red;
+                Console.WriteLine($"Call error: {ex.Message}");
+                Console.ForegroundColor = initialColor;
+
+                throw;
+            }
+        }
+
+        public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
+            ClientInterceptorContext<TRequest, TResponse> context,
+            AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
+        {
+            LogCall(context.Method);
+            AddCallerMetadata(ref context);
+
+            return continuation(context);
+        }
+
+        public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
+            TRequest request,
+            ClientInterceptorContext<TRequest, TResponse> context,
+            AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
+        {
+            LogCall(context.Method);
+            AddCallerMetadata(ref context);
+
+            return continuation(request, context);
+        }
+
+        public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
+            ClientInterceptorContext<TRequest, TResponse> context,
+            AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
+        {
+            LogCall(context.Method);
+            AddCallerMetadata(ref context);
+
+            return continuation(context);
+        }
+
+        private void LogCall<TRequest, TResponse>(Method<TRequest, TResponse> method)
+            where TRequest : class
+            where TResponse : class
+        {
+            var initialColor = Console.ForegroundColor;
+            Console.ForegroundColor = ConsoleColor.Green;
+            Console.WriteLine($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
+            Console.ForegroundColor = initialColor;
+        }
+
+        private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
+            where TRequest : class
+            where TResponse : class
+        {
+            var headers = context.Options.Headers;
+
+            // Call doesn't have a headers collection to add to.
+            // Need to create a new context with headers for the call.
+            if (headers == null)
+            {
+                headers = new Metadata();
+                var options = context.Options.WithHeaders(headers);
+                context = new ClientInterceptorContext<TRequest, TResponse>(context.Method, context.Host, options);
+            }
+
+            // Add caller metadata to call headers
+            headers.Add("caller-user", Environment.UserName);
+            headers.Add("caller-machine", Environment.MachineName);
+            headers.Add("caller-os", Environment.OSVersion.ToString());
+        }
+    }
+}
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 4dd8790..59fec83 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -20,9 +20,12 @@ using System.Collections.Concurrent;
 using rmq = global::apache.rocketmq.v1;
 using Grpc.Net.Client;
 using System;
+using System.Threading;
 using System.Threading.Tasks;
 using grpc = global::Grpc.Core;
 using System.Collections.Generic;
+using Grpc.Core.Interceptors;
+using System.Net.Http;
 
 namespace org.apache.rocketmq {
     public class ClientManager : IClientManager {
@@ -33,8 +36,11 @@ namespace org.apache.rocketmq {
 
         public IRpcClient getRpcClient(string target) {
             if (!rpcClients.ContainsKey(target)) {
-                using var channel = GrpcChannel.ForAddress(target);
-                var client = new rmq.MessagingService.MessagingServiceClient(channel);
+                var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions {
+                    HttpHandler = createHttpHandler()
+                });
+                var invoker = channel.Intercept(new ClientLoggerInterceptor());
+                var client = new rmq::MessagingService.MessagingServiceClient(invoker);
                 var rpcClient = new RpcClient(client);
                 if(rpcClients.TryAdd(target, rpcClient)) {
                     return rpcClient;
@@ -43,11 +49,32 @@ namespace org.apache.rocketmq {
             return rpcClients[target];
         }
 
-        public async Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, rmq.QueryRouteRequest request, TimeSpan timeout) {
+        /**
+         * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
+         * why parameters are configured this way.
+         */
+        public static HttpMessageHandler createHttpHandler()
+        {
+            var sslOptions = new System.Net.Security.SslClientAuthenticationOptions();
+            // Disable server certificate validation during development phase.
+            // Comment out the following line if server certificate validation is required. 
+            sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
+            var handler = new SocketsHttpHandler
+            {
+                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
+                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
+                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
+                EnableMultipleHttp2Connections = true,
+                SslOptions = sslOptions,
+            };
+            return handler;
+        }
+
+        public async Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
+        {
             var rpcClient = getRpcClient(target);
-            var callOptions = new grpc::CallOptions();
-            callOptions.WithDeadline(DateTime.Now.Add(timeout))
-                .WithHeaders(metadata);
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new grpc::CallOptions(metadata, deadline);
             var queryRouteResponse = await rpcClient.queryRoute(request, callOptions);
 
             if (queryRouteResponse.Common.Status.Code != ((int)Google.Rpc.Code.Ok)) {
@@ -63,22 +90,22 @@ namespace org.apache.rocketmq {
                 var id = partition.Id;
                 Permission permission = Permission.READ_WRITE;
                 switch (partition.Permission) {
-                    case rmq.Permission.None:
+                    case rmq::Permission.None:
                     {
                         permission = Permission.NONE;
                         break;
                     }
-                    case rmq.Permission.Read:
+                    case rmq::Permission.Read:
                     {
                         permission = Permission.READ;
                         break;
                     }
-                    case rmq.Permission.Write:
+                    case rmq::Permission.Write:
                     {
                         permission = Permission.WRITE;
                         break;
                     }
-                    case rmq.Permission.ReadWrite:
+                    case rmq::Permission.ReadWrite:
                     {
                         permission = Permission.READ_WRITE;
                         break;
@@ -87,15 +114,18 @@ namespace org.apache.rocketmq {
 
                 AddressScheme scheme = AddressScheme.IPv4;
                 switch(partition.Broker.Endpoints.Scheme) {
-                    case rmq.AddressScheme.Ipv4: {
+                    case rmq::AddressScheme.Ipv4:
+                        {
                         scheme = AddressScheme.IPv4;
                         break;
                     }
-                    case rmq.AddressScheme.Ipv6: {
+                    case rmq::AddressScheme.Ipv6:
+                        {
                         scheme = AddressScheme.IPv6;
                         break;
                     }
-                    case rmq.AddressScheme.DomainName: {
+                    case rmq::AddressScheme.DomainName:
+                        {
                         scheme = AddressScheme.DOMAIN_NAME;
                         break;
                     }
@@ -114,6 +144,38 @@ namespace org.apache.rocketmq {
             return topicRouteData;
         }
 
+        public async Task<Boolean> heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
+        {
+            var rpcClient = getRpcClient(target);
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new grpc.CallOptions(metadata, deadline);
+            var response = await rpcClient.heartbeat(request, callOptions);
+            if (null == response)
+            {
+                return false;
+            }
+
+            return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+        }
+
+        public async Task<rmq::SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = getRpcClient(target);
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new grpc::CallOptions(metadata, deadline);
+            var response = await rpcClient.sendMessage(request, callOptions);
+            return response;
+        }
+
+        public async Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+        {
+            var rpcClient = getRpcClient(target);
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new grpc::CallOptions(metadata, deadline);
+            rmq::NotifyClientTerminationResponse response = await rpcClient.notifyClientTermination(request, callOptions);
+            return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+        }
+
         private ConcurrentDictionary<string, RpcClient> rpcClients;
 
     }
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/ClientManagerFactory.cs
similarity index 50%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/ClientManagerFactory.cs
index d4102b8..3ca211d 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/ClientManagerFactory.cs
@@ -1,29 +1,45 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
-    public interface IRpcClient
-    {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace org.apache.rocketmq
+{
+    public sealed class ClientManagerFactory
+    {
+        public static IClientManager getClientManager(string resourceNamespace)
+        {
+            if (clientManagers.ContainsKey(resourceNamespace))
+            {
+                return clientManagers[resourceNamespace];
+            }
+
+            var clientManager = new ClientManager();
+            // TODO: configure client managers.
+            if (clientManagers.TryAdd<string, IClientManager>(resourceNamespace, clientManager))
+            {
+                return clientManager;
+            }
+
+            return clientManagers[resourceNamespace];
+        }
+
+        private static ConcurrentDictionary<string, IClientManager> clientManagers = new ConcurrentDictionary<string, IClientManager>();
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
new file mode 100644
index 0000000..1381b3f
--- /dev/null
+++ b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+using System;
+using System.Text.Json;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq {
+
+    /**
+     * File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config
+     * A sample config content is as follows:
+     * {"AccessKey": "key", "AccessSecret": "secret"}
+     */
+    public class ConfigFileCredentialsProvider : ICredentialsProvider {
+
+        public ConfigFileCredentialsProvider() {
+            var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
+            string configFileRelativePath = "/.rocketmq/config";
+            if (!File.Exists(home + configFileRelativePath)) {
+                return;
+            }
+
+            try {
+                using (var reader = new StreamReader(home + configFileRelativePath)) {
+                    string json = reader.ReadToEnd();
+                    var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json);
+                    accessKey = kv["AccessKey"];
+                    accessSecret = kv["AccessSecret"];
+                    valid = true;
+                } 
+            } catch (IOException e) {
+            }
+        }
+
+        public Credentials getCredentials() {
+            if (!valid) {
+                return null;
+            }
+
+            return new Credentials(accessKey, accessSecret);
+        }
+
+        private string accessKey;
+        private string accessSecret;
+
+        private bool valid = false;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IClient.cs
similarity index 78%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/IClient.cs
index d4102b8..7f3ed64 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -1,29 +1,32 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
-    public interface IRpcClient
-    {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq
+{
+    public interface IClient : IClientConfig
+    {
+
+        void heartbeat();
+
+        void healthCheck();
+
+        Task<bool> notifyClientTermination();
+
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index ea8ba55..08ed86a 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -26,5 +26,11 @@ namespace org.apache.rocketmq {
 
         Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
 
+        Task<Boolean> heartbeat(string target, grpc::Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+
+        Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout);
+
+        Task<SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/INameServerResolver.cs
similarity index 79%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/INameServerResolver.cs
index d4102b8..568098f 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/INameServerResolver.cs
@@ -1,29 +1,27 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
-    public interface IRpcClient
-    {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq
+{
+    public interface INameServerResolver
+    {
+        Task<List<string>> resolveAsync();
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IProducer.cs
similarity index 76%
copy from rocketmq-client-csharp/IClientManager.cs
copy to rocketmq-client-csharp/IProducer.cs
index ea8ba55..89f8955 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-using apache.rocketmq.v1;
-using System.Threading.Tasks;
 using System;
-using grpc = global::Grpc.Core;
+using System.Threading.Tasks;
 
 namespace org.apache.rocketmq {
-    public interface IClientManager {
-        IRpcClient getRpcClient(string target);
+    public interface IProducer {
+        void start();
 
-        Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
+        void shutdown();
 
+        Task<SendResult> send(Message message);
+        
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index d4102b8..0590bb0 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -23,7 +23,12 @@ namespace org.apache.rocketmq
 {
     public interface IRpcClient
     {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
+        Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
 
+        Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions);
+
+        Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions);
+
+        Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions);
     }
 }
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
new file mode 100644
index 0000000..282e8aa
--- /dev/null
+++ b/rocketmq-client-csharp/Message.cs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Collections.Generic;
+namespace org.apache.rocketmq
+{
+
+    public class Message {
+        public Message() : this(null, null) {
+        }
+
+        public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {}
+
+        public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) {
+        }
+
+        public Message(string topic, string tag, List<string> keys, byte[] body) {
+            this.maxAttemptTimes = 3;
+            this.topic = topic;
+            this.tag = tag;
+            this.keys = keys;
+            this.body = body;
+            this.userProperties = new Dictionary<string, string>();
+            this.systemProperties = new Dictionary<string, string>();
+        }
+
+        private string topic;
+
+        public string Topic {
+            get { return topic; }
+            set { this.topic = value; }
+        }
+
+        private byte[] body;
+        public byte[] Body {
+            get { return body; }
+            set { this.body = value; }
+        }
+
+        private string tag;
+        public string Tag {
+            get { return tag; }
+            set { this.tag = value; }
+        }
+
+        private List<string> keys;
+        public List<string> Keys{
+            get { return keys; }
+            set { this.keys = value; }
+        }
+
+        private Dictionary<string, string> userProperties;
+        public Dictionary<string, string> UserProperties {
+            get { return userProperties; }
+            set { this.userProperties = value; }
+        }
+
+        private Dictionary<string, string> systemProperties;
+        internal Dictionary<string, string> SystemProperties {
+            get { return systemProperties; }
+            set { this.systemProperties = value; }
+        }
+
+        private int maxAttemptTimes;
+        public int MaxAttemptTimes
+        {
+            get { return maxAttemptTimes; }
+            set { maxAttemptTimes = value; }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/MessageType.cs
similarity index 76%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/MessageType.cs
index d4102b8..376b658 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/MessageType.cs
@@ -1,29 +1,28 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
-    public interface IRpcClient
-    {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace org.apache.rocketmq
+{
+    public enum MessageType {
+        Normal,
+        Fifo,
+        Delay,
+        Transaction,
+    }
+    
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
new file mode 100644
index 0000000..ed837bc
--- /dev/null
+++ b/rocketmq-client-csharp/Producer.cs
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading.Tasks;
+using rmq = apache.rocketmq.v1;
+using pb = global::Google.Protobuf;
+using grpc = global::Grpc.Core;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+
+namespace org.apache.rocketmq
+{
+    public class Producer : Client, IProducer
+    {
+        public Producer(INameServerResolver resolver) : base(resolver)
+        {
+            this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+        }
+
+        public override void start()
+        {
+            base.start();
+            // More initalization
+        }
+
+        public override void shutdown()
+        {
+            // Release local resources
+            base.shutdown();
+        }
+
+        public override void prepareHeartbeatData(rmq::HeartbeatRequest request)
+        {
+
+        }
+
+        public async Task<SendResult> send(Message message)
+        {
+            if (!loadBalancer.ContainsKey(message.Topic))
+            {
+                var topicRouteData = await getRouteFor(message.Topic, false);
+                if (null == topicRouteData || null == topicRouteData.Partitions || 0 == topicRouteData.Partitions.Count)
+                {
+                    throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
+                }
+
+                var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
+                loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+            }
+
+            var publishLB = loadBalancer[message.Topic];
+
+            var request = new rmq::SendMessageRequest();
+            request.Message = new rmq::Message();
+            request.Message.Body = pb::ByteString.CopyFrom(message.Body);
+            request.Message.Topic = new rmq::Resource();
+            request.Message.Topic.ResourceNamespace = resourceNamespace();
+            request.Message.Topic.Name = message.Topic;
+
+            // User properties
+            foreach (var item in message.UserProperties)
+            {
+                request.Message.UserAttribute.Add(item.Key, item.Value);
+            }
+
+            request.Message.SystemAttribute = new rmq::SystemAttribute();
+            if (!string.IsNullOrEmpty(message.Tag))
+            {
+                request.Message.SystemAttribute.Tag = message.Tag;
+            }
+
+            if (0 != message.Keys.Count)
+            {
+                foreach (var key in message.Keys)
+                {
+                    request.Message.SystemAttribute.Keys.Add(key);
+                }
+            }
+
+            // string target = "https://";
+            List<string> targets = new List<string>();
+            List<Partition> candidates = publishLB.select(message.MaxAttemptTimes);
+            foreach (var partition in candidates)
+            {
+                targets.Add(partition.Broker.targetUrl());
+            }
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+
+            Exception ex = null;
+
+            foreach (var target in targets)
+            {
+                try
+                {
+                    rmq::SendMessageResponse response = await clientManager.sendMessage(target, metadata, request, getIoTimeout());
+                    if (null != response && (int)global::Google.Rpc.Code.Ok == response.Common.Status.Code)
+                    {
+                        var messageId = response.MessageId;
+                        return new SendResult(messageId);
+                    }
+                }
+                catch (Exception e)
+                {
+                    ex = e;
+                }
+            }
+
+            if (null != ex)
+            {
+                throw ex;
+            }
+
+            throw new Exception("Send message failed");
+        }
+
+        private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PublishLoadBalancer.cs b/rocketmq-client-csharp/PublishLoadBalancer.cs
new file mode 100644
index 0000000..9a1b66d
--- /dev/null
+++ b/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq
+{
+    public class PublishLoadBalancer
+    {
+        public PublishLoadBalancer(TopicRouteData route)
+        {
+            this.partitions = new List<Partition>();
+            foreach (var partition in route.Partitions)
+            {
+                if (Permission.NONE == partition.Permission)
+                {
+                    continue;
+                }
+
+                if (Permission.READ == partition.Permission)
+                {
+                    continue;
+                }
+
+                this.partitions.Add(partition);
+            }
+
+            this.partitions.Sort();
+            Random random = new Random();
+            this.roundRobinIndex = random.Next(0, this.partitions.Count);
+        }
+
+        public void update(TopicRouteData route)
+        {
+            List<Partition> partitions = new List<Partition>();
+            foreach (var partition in route.Partitions)
+            {
+                if (Permission.NONE == partition.Permission)
+                {
+                    continue;
+                }
+
+                if (Permission.READ == partition.Permission)
+                {
+                    continue;
+                }
+                partitions.Add(partition);
+            }
+            partitions.Sort();
+            this.partitions = partitions;
+        }
+
+        /**
+         * Accept a partition iff its broker is different.
+         */
+        private bool accept(List<Partition> existing, Partition partition)
+        {
+            if (0 == existing.Count)
+            {
+                return true;
+            }
+
+            foreach (var item in existing)
+            {
+                if (item.Broker.Equals(partition.Broker))
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public List<Partition> select(int maxAttemptTimes)
+        {
+            List<Partition> result = new List<Partition>();
+
+            List<Partition> all = this.partitions;
+            if (0 == all.Count)
+            {
+                return result;
+            }
+            int start = ++roundRobinIndex;
+            int found = 0;
+
+            for (int i = 0; i < all.Count; i++)
+            {
+                int idx = ((start + i) & int.MaxValue) % all.Count;
+                if (accept(result, all[idx]))
+                {
+                    result.Add(all[idx]);
+                    if (++found >= maxAttemptTimes)
+                    {
+                        break;
+                    }
+                }
+            }
+
+            return result;
+        }
+
+        private List<Partition> partitions;
+
+        private int roundRobinIndex;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 0cc8354..0191e91 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -25,7 +25,8 @@ namespace org.apache.rocketmq {
             stub = client;
         }
 
-        public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions) {
+        public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions)
+        {
             var call = stub.QueryRouteAsync(request, callOptions);
             var response = await call.ResponseAsync;
             var status = call.GetStatus();
@@ -35,6 +36,26 @@ namespace org.apache.rocketmq {
             return response;
         }
 
+        public async Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions)
+        {
+            var call = stub.HeartbeatAsync(request, callOptions);
+            var response = await call.ResponseAsync;
+            return response;
+        }
+
+        public async Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions)
+        {
+            var call = stub.NotifyClientTerminationAsync(request, callOptions);
+            var response = await call.ResponseAsync;
+            return response;
+        }
+
+        public async Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions)
+        {
+            var call = stub.SendMessageAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
         private MessagingService.MessagingServiceClient stub;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Signature.cs b/rocketmq-client-csharp/Signature.cs
index 4f1fd93..70e038a 100644
--- a/rocketmq-client-csharp/Signature.cs
+++ b/rocketmq-client-csharp/Signature.cs
@@ -22,7 +22,7 @@ using System.Security.Cryptography;
 namespace org.apache.rocketmq {
     public class Signature {
         public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) {
-            metadata.Add(MetadataConstants.LANGUAGE_KEY, "C#");
+            metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
             metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
             if (!String.IsNullOrEmpty(clientConfig.tenantId())) {
                 metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/StaticNameServerResolver.cs
similarity index 69%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/StaticNameServerResolver.cs
index d4102b8..9f97599 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/StaticNameServerResolver.cs
@@ -1,29 +1,38 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
-    public interface IRpcClient
-    {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq
+{
+    public class StaticNameServerResolver : INameServerResolver
+    {
+
+        public StaticNameServerResolver(List<string> nameServerList)
+        {
+            this.nameServerList = nameServerList;
+        }
+
+        public async Task<List<string>> resolveAsync()
+        {
+            return nameServerList;
+        }
+
+        private List<string> nameServerList;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/TopicRouteException.cs
similarity index 76%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/TopicRouteException.cs
index d4102b8..b520e72 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/TopicRouteException.cs
@@ -1,29 +1,28 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
-    public interface IRpcClient
-    {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+namespace org.apache.rocketmq
+{
+    public class TopicRouteException : Exception
+    {
+        public TopicRouteException(string message) : base(message)
+        {
+
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/tests/ClientManagerTest.cs b/tests/ClientManagerTest.cs
new file mode 100644
index 0000000..0f8bff7
--- /dev/null
+++ b/tests/ClientManagerTest.cs
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using rmq = apache.rocketmq.v1;
+using grpc = global::Grpc.Core;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq {
+
+    [TestClass]
+    public class ClientManagerTest {
+        
+        [TestMethod]
+        public void testResolveRoute() {
+            string topic = "cpp_sdk_standard";
+            string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+            var request = new rmq::QueryRouteRequest();
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = resourceNamespace;
+            request.Topic.Name = topic;
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            address.Host = "116.62.231.199";
+            address.Port = 80;
+            request.Endpoints.Addresses.Add(address);
+
+            var metadata = new grpc::Metadata();
+            var clientConfig = new ClientConfig();
+            var credentialsProvider = new ConfigFileCredentialsProvider();
+            clientConfig.CredentialsProvider = credentialsProvider;
+            clientConfig.ResourceNamespace = resourceNamespace;
+            clientConfig.Region = "cn-hangzhou-pre";
+            Signature.sign(clientConfig, metadata);
+            var clientManager = new ClientManager();
+            string target = "https://116.62.231.199:80";
+            var topicRouteData = clientManager.resolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+            Console.WriteLine(topicRouteData);
+        }
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientManager.cs b/tests/ConfigFileCredentialsProviderTest.cs
similarity index 70%
copy from rocketmq-client-csharp/IClientManager.cs
copy to tests/ConfigFileCredentialsProviderTest.cs
index ea8ba55..f94d364 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/tests/ConfigFileCredentialsProviderTest.cs
@@ -15,16 +15,17 @@
  * limitations under the License.
  */
 
-using apache.rocketmq.v1;
-using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
-using grpc = global::Grpc.Core;
 
 namespace org.apache.rocketmq {
-    public interface IClientManager {
-        IRpcClient getRpcClient(string target);
-
-        Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
-
+    [TestClass]
+    public class ConfigFileCredentialsProviderTest {
+        [TestMethod]
+        public void testGetCredentials() {
+            var provider = new ConfigFileCredentialsProvider();
+            var credentials = provider.getCredentials();
+            Assert.IsNotNull(credentials);
+        }
     }
 }
\ No newline at end of file
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
new file mode 100644
index 0000000..eebdc46
--- /dev/null
+++ b/tests/MessageTest.cs
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.Text;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq {
+    [TestClass]
+    public class MessageTest {
+
+        [TestMethod]
+        public void testCtor() {
+            var msg1 = new Message();
+            Assert.IsNull(msg1.Topic);
+            Assert.IsNull(msg1.Body);
+            Assert.IsNull(msg1.Tag);
+            Assert.AreEqual(msg1.Keys.Count, 0);
+            Assert.AreEqual(msg1.UserProperties.Count, 0);
+        }
+
+        [TestMethod]
+        public void testCtor2() {
+            string topic = "T1";
+            string bodyString = "body";
+            byte[] body = Encoding.ASCII.GetBytes(bodyString);
+            var msg1 = new Message(topic, body);
+            Assert.AreEqual(msg1.Topic, topic);
+            Assert.AreEqual(msg1.Body, body);
+            Assert.IsNull(msg1.Tag);
+            Assert.AreEqual(msg1.Keys.Count, 0);
+            Assert.AreEqual(msg1.UserProperties.Count, 0);
+        }
+
+        [TestMethod]
+        public void testCtor3() {
+            string topic = "T1";
+            string bodyString = "body";
+            byte[] body = Encoding.ASCII.GetBytes(bodyString);
+            string tag = "TagA";
+            var msg1 = new Message(topic, tag, body);
+            Assert.AreEqual(msg1.Topic, topic);
+            Assert.AreEqual(msg1.Body, body);
+            Assert.AreEqual(msg1.Tag, tag);
+            Assert.AreEqual(msg1.Keys.Count, 0);
+            Assert.AreEqual(msg1.UserProperties.Count, 0);
+        }
+
+        [TestMethod]
+        public void testCtor4() {
+            string topic = "T1";
+            string bodyString = "body";
+            byte[] body = Encoding.ASCII.GetBytes(bodyString);
+            string tag = "TagA";
+            List<string> keys = new List<string>();
+            keys.Add("Key1");
+            keys.Add("Key2");
+
+            var msg1 = new Message(topic, tag, keys, body);
+            Assert.AreEqual(msg1.Topic, topic);
+            Assert.AreEqual(msg1.Body, body);
+            Assert.AreEqual(msg1.Tag, tag);
+            Assert.AreEqual(msg1.Keys, keys);
+            Assert.AreEqual(msg1.UserProperties.Count, 0);
+        }
+
+        [TestMethod]
+        public void testCtor5() {
+            string topic = "T1";
+            string bodyString = "body";
+            byte[] body = Encoding.ASCII.GetBytes(bodyString);
+            string tag = "TagA";
+            List<string> keys = new List<string>();
+            keys.Add("Key1");
+            keys.Add("Key2");
+
+            var msg1 = new Message(topic, tag, keys, body);
+
+            msg1.UserProperties.Add("k", "v");
+            msg1.UserProperties.Add("k2", "v2");
+
+            Assert.AreEqual(msg1.Topic, topic);
+            Assert.AreEqual(msg1.Body, body);
+            Assert.AreEqual(msg1.Tag, tag);
+            Assert.AreEqual(msg1.Keys, keys);
+            Assert.AreEqual(msg1.UserProperties.Count, 2);
+
+            string value;
+            msg1.UserProperties.TryGetValue("k", out value);
+            Assert.AreEqual(value, "v");
+
+            msg1.UserProperties.TryGetValue("k2", out value);
+            Assert.AreEqual(value, "v2");
+
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
new file mode 100644
index 0000000..94a43c2
--- /dev/null
+++ b/tests/RpcClientTest.cs
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Grpc.Core.Interceptors;
+using System.Net.Http;
+using Grpc.Net.Client;
+using rmq = global::apache.rocketmq.v1;
+using grpc = global::Grpc.Core;
+using System;
+
+namespace org.apache.rocketmq
+{
+    [TestClass]
+    public class RpcClientTest
+    {
+
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+            string target = string.Format("https://{0}:{1}", host, port);
+            var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+            {
+                HttpHandler = ClientManager.createHttpHandler()
+            });
+            var invoker = channel.Intercept(new ClientLoggerInterceptor());
+            var client = new rmq::MessagingService.MessagingServiceClient(invoker);
+            rpcClient = new RpcClient(client);
+
+            clientConfig = new ClientConfig();
+            var credentialsProvider = new ConfigFileCredentialsProvider();
+            clientConfig.CredentialsProvider = credentialsProvider;
+            clientConfig.ResourceNamespace = resourceNamespace;
+            clientConfig.Region = "cn-hangzhou-pre";
+        }
+
+        [ClassCleanup]
+        public static void TearDown()
+        {
+
+        }
+
+        [TestMethod]
+        public void testQueryRoute()
+        {
+            var request = new rmq::QueryRouteRequest();
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = resourceNamespace;
+            request.Topic.Name = topic;
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            address.Host = host;
+            address.Port = port;
+            request.Endpoints.Addresses.Add(address);
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(clientConfig, metadata);
+
+            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+            var callOptions = new grpc::CallOptions(metadata, deadline);
+            var response = rpcClient.queryRoute(request, callOptions).GetAwaiter().GetResult();
+        }
+
+
+        [TestMethod]
+        public void testHeartbeat()
+        {
+
+            var request = new rmq::HeartbeatRequest();
+            request.ClientId = clientId;
+            request.ProducerData = new rmq::ProducerData();
+            request.ProducerData.Group = new rmq::Resource();
+            request.ProducerData.Group.ResourceNamespace = resourceNamespace;
+            request.ProducerData.Group.Name = topic;
+            request.FifoFlag = false;
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(clientConfig, metadata);
+
+            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+            var callOptions = new grpc::CallOptions(metadata, deadline);
+            var response = rpcClient.heartbeat(request, callOptions).GetAwaiter().GetResult();
+        }
+
+        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+
+        private static string topic = "cpp_sdk_standard";
+
+        private static string clientId = "C001";
+        private static string group = "GID_cpp_sdk_standard";
+
+        private static string host = "116.62.231.199";
+        private static int port = 80;
+
+        private static IRpcClient rpcClient;
+        private static ClientConfig clientConfig;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/tests/StaticNameServerResolverTest.cs
similarity index 62%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to tests/StaticNameServerResolverTest.cs
index d4102b8..88955e9 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/tests/StaticNameServerResolverTest.cs
@@ -1,29 +1,35 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
-    public interface IRpcClient
-    {
-     Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq
+{
+    [TestClass]
+    public class StaticNameServerResolverTest
+    {
+        [TestMethod]
+        public void testResolve()
+        {
+            List<string> list = new List<string>();
+            list.Add("https://localhost:80");
+            var resolver = new StaticNameServerResolver(list);
+            var result = resolver.resolveAsync().GetAwaiter().GetResult();
+            Assert.AreSame(list, result);
+        }
+    }
+}
\ No newline at end of file