You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/20 02:39:31 UTC

[rocketmq-clients] branch csharp_dev created (now a2bca73)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


      at a2bca73  Merge changes from upstream

This branch includes the following new commits:

     new a2bca73  Merge changes from upstream

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-clients] 01/01: Merge changes from upstream

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit a2bca73ad47a6b47e0417e389939e792616a8537
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Jul 20 10:36:10 2022 +0800

    Merge changes from upstream
---
 csharp/examples/Program.cs                         |  17 +-
 .../{TopicRouteException.cs => AccessPoint.cs}     |  24 +-
 csharp/rocketmq-client-csharp/Client.cs            | 429 +++++++++++++++-----
 csharp/rocketmq-client-csharp/ClientConfig.cs      | 164 ++++++--
 .../ClientLoggerInterceptor.cs                     |  22 +-
 csharp/rocketmq-client-csharp/ClientManager.cs     | 362 ++++++++++++-----
 .../rocketmq-client-csharp/ClientManagerFactory.cs |   2 +-
 .../ConfigFileCredentialsProvider.cs               |  30 +-
 csharp/rocketmq-client-csharp/Credentials.cs       |  32 +-
 .../{ICredentialsProvider.cs => ExpressionType.cs} |  11 +-
 ...{TopicRouteException.cs => FilterExpression.cs} |  13 +-
 csharp/rocketmq-client-csharp/IClient.cs           |  15 +-
 csharp/rocketmq-client-csharp/IClientConfig.cs     |   8 +-
 csharp/rocketmq-client-csharp/IClientManager.cs    |  31 +-
 .../{MessageType.cs => IConsumer.cs}               |  15 +-
 .../rocketmq-client-csharp/ICredentialsProvider.cs |   6 +-
 .../{IClient.cs => IMessageListener.cs}            |  14 +-
 csharp/rocketmq-client-csharp/IProducer.cs         |  15 +-
 csharp/rocketmq-client-csharp/IRpcClient.cs        |  38 +-
 csharp/rocketmq-client-csharp/Message.cs           | 122 +++---
 ...{TopicRouteException.cs => MessageException.cs} |  11 +-
 .../rocketmq-client-csharp/MessageIdGenerator.cs   |   2 +-
 csharp/rocketmq-client-csharp/MessageType.cs       |   7 +-
 csharp/rocketmq-client-csharp/MetadataConstants.cs |   8 +-
 csharp/rocketmq-client-csharp/MqLogManager.cs      |   2 +-
 .../{TopicRouteException.cs => ProcessQueue.cs}    |  21 +-
 csharp/rocketmq-client-csharp/Producer.cs          | 154 +++++--
 .../Protos/apache/rocketmq/v2/admin.proto          |  43 ++
 .../Protos/apache/rocketmq/v2/definition.proto     | 443 ++++++++++++++++++++
 .../Protos/apache/rocketmq/v2/service.proto        | 445 +++++++++++++++++++++
 .../rocketmq-client-csharp/PublishLoadBalancer.cs  |  47 +--
 .../{IClient.cs => Publishing.cs}                  |  17 +-
 csharp/rocketmq-client-csharp/PushConsumer.cs      | 261 ++++++++++++
 csharp/rocketmq-client-csharp/RpcClient.cs         | 177 ++++++--
 .../{TopicRouteException.cs => SendReceipt.cs}     |  29 +-
 csharp/rocketmq-client-csharp/SendStatus.cs        |   6 +-
 csharp/rocketmq-client-csharp/SequenceGenerator.cs |   7 +-
 csharp/rocketmq-client-csharp/Session.cs           | 122 ++++++
 csharp/rocketmq-client-csharp/Signature.cs         |  27 +-
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    | 275 +++++++++++++
 .../StaticCredentialsProvider.cs                   |  12 +-
 csharp/rocketmq-client-csharp/Topic.cs             |  74 ++--
 csharp/rocketmq-client-csharp/TopicRouteData.cs    |  46 +--
 .../rocketmq-client-csharp/TopicRouteException.cs  |   2 +-
 csharp/rocketmq-client-csharp/Utilities.cs         |  28 +-
 .../rocketmq-client-csharp.csproj                  |  14 +-
 csharp/tests/ClientConfigTest.cs                   |   9 +-
 csharp/tests/ClientManagerTest.cs                  |  21 +-
 csharp/tests/ConfigFileCredentialsProviderTest.cs  |   9 +-
 csharp/tests/DateTimeTest.cs                       |  17 +-
 csharp/tests/MessageIdGeneratorTest.cs             |   2 +-
 csharp/tests/MessageTest.cs                        |  21 +-
 csharp/tests/MqLogManagerTest.cs                   |   2 +-
 csharp/tests/ProducerTest.cs                       | 157 +++++++-
 csharp/tests/PushConsumerTest.cs                   | 119 ++++++
 csharp/tests/RpcClientTest.cs                      | 211 +++++-----
 csharp/tests/SendResultTest.cs                     |  18 +-
 csharp/tests/SequenceGeneratorTest.cs              |   2 +-
 csharp/tests/SignatureTest.cs                      |   8 +-
 csharp/tests/SimpleConsumerTest.cs                 | 108 +++++
 csharp/tests/StaticCredentialsProviderTest.cs      |   9 +-
 csharp/tests/TopicTest.cs                          |  23 +-
 csharp/tests/UnitTest1.cs                          |  87 ++--
 63 files changed, 3660 insertions(+), 813 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index 9bf745c..09a1674 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -5,20 +5,24 @@ using System.Threading;
 namespace examples
 {
 
-    class Foo {
+    class Foo
+    {
         public int bar = 1;
     }
     class Program
     {
 
-        static void RT(Action action, int seconds, CancellationToken token) {
-            if (null == action) {
+        static void RT(Action action, int seconds, CancellationToken token)
+        {
+            if (null == action)
+            {
                 return;
             }
 
             Task.Run(async () =>
             {
-                while(!token.IsCancellationRequested) {
+                while (!token.IsCancellationRequested)
+                {
                     action();
                     await Task.Delay(TimeSpan.FromSeconds(seconds), token);
                 }
@@ -31,7 +35,7 @@ namespace examples
 
             string accessKey = "key";
             string accessSecret = "secret";
-            var credentials = new org.apache.rocketmq.StaticCredentialsProvider(accessKey, accessSecret).getCredentials();
+            var credentials = new Org.Apache.Rocketmq.StaticCredentialsProvider(accessKey, accessSecret).getCredentials();
             bool expired = credentials.expired();
 
             int workerThreads;
@@ -44,7 +48,8 @@ namespace examples
             ThreadPool.QueueUserWorkItem((Object stateInfo) =>
             {
                 Console.WriteLine("From ThreadPool");
-                if (stateInfo is Foo) {
+                if (stateInfo is Foo)
+                {
                     Console.WriteLine("Foo: bar=" + (stateInfo as Foo).bar);
                 }
             }, new Foo());
diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/AccessPoint.cs
similarity index 66%
copy from csharp/rocketmq-client-csharp/TopicRouteException.cs
copy to csharp/rocketmq-client-csharp/AccessPoint.cs
index b520e72..cf4e1f4 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteException.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -14,15 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public class TopicRouteException : Exception
+    public class AccessPoint
     {
-        public TopicRouteException(string message) : base(message)
+        private string _host;
+
+        public string Host
         {
+            get { return _host; }
+            set { _host = value; }
+        }
+
+        private int _port;
 
+        public int Port
+        {
+            get { return _port; }
+            set { _port = value; }
         }
 
+        public string TargetUrl()
+        {
+            return $"https://{_host}:{_port}";
+        }
     }
-}
\ No newline at end of file
+}
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index dac6d89..32dffae 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -19,113 +19,176 @@ using System.Collections.Generic;
 using System.Collections.Concurrent;
 using System.Threading.Tasks;
 using System.Threading;
+using System.Diagnostics;
 using System;
-using rmq = apache.rocketmq.v1;
+using rmq = Apache.Rocketmq.V2;
 using grpc = global::Grpc.Core;
+using NLog;
+using System.Diagnostics.Metrics;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
 
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public abstract class Client : ClientConfig, IClient
     {
+        protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        public Client(INameServerResolver resolver, string resourceNamespace)
+        protected Client(AccessPoint accessPoint, string resourceNamespace)
         {
-            this.nameServerResolver = resolver;
-            this.resourceNamespace_ = resourceNamespace;
-            this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace);
-            this.nameServerResolverCTS = new CancellationTokenSource();
+            _accessPoint = accessPoint;
 
-            this.topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
-            this.updateTopicRouteCTS = new CancellationTokenSource();
+            // Support IPv4 for now
+            AccessPointScheme = rmq::AddressScheme.Ipv4;
+            var serviceEndpoint = new rmq::Address();
+            serviceEndpoint.Host = accessPoint.Host;
+            serviceEndpoint.Port = accessPoint.Port;
+            AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
+
+            _resourceNamespace = resourceNamespace;
+
+            _clientSettings = new rmq::Settings();
+
+            _clientSettings.AccessPoint = new rmq::Endpoints();
+            _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+
+            _clientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+
+            _clientSettings.UserAgent = new rmq.UA();
+            _clientSettings.UserAgent.Language = rmq::Language.DotNet;
+            _clientSettings.UserAgent.Version = "5.0.0";
+            _clientSettings.UserAgent.Platform = Environment.OSVersion.ToString();
+            _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+
+            Manager = ClientManagerFactory.getClientManager(resourceNamespace);
+
+            _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
+            _updateTopicRouteCts = new CancellationTokenSource();
+
+            _healthCheckCts = new CancellationTokenSource();
+
+            telemetryCts_ = new CancellationTokenSource();
         }
 
-        public virtual void start()
+        public virtual async Task Start()
         {
             schedule(async () =>
             {
-                await updateNameServerList();
-            }, 30, nameServerResolverCTS.Token);
+                await UpdateTopicRoute();
 
-            schedule(async () =>
-            {
-                await updateTopicRoute();
+            }, 30, _updateTopicRouteCts.Token);
+
+            // Get routes for topics of interest.
+            await UpdateTopicRoute();
+
+            string accessPointUrl = _accessPoint.TargetUrl();
+            createSession(accessPointUrl);
 
-            }, 30, updateTopicRouteCTS.Token);
+            await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
 
+            await Heartbeat();
         }
 
-        public virtual void shutdown()
+        public virtual async Task Shutdown()
         {
-            updateTopicRouteCTS.Cancel();
-            nameServerResolverCTS.Cancel();
+            Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
+            _updateTopicRouteCts.Cancel();
+            telemetryCts_.Cancel();
+            await Manager.Shutdown();
         }
 
-        private async Task updateNameServerList()
+        protected string FilterBroker(Func<string, bool> acceptor)
         {
-            List<string> nameServers = await nameServerResolver.resolveAsync();
-            if (0 == nameServers.Count)
+            foreach (var item in _topicRouteTable)
             {
-                // Whoops, something should be wrong. We got an empty name server list.
-                return;
+                foreach (var partition in item.Value.MessageQueues)
+                {
+                    string target = Utilities.TargetUrl(partition);
+                    if (acceptor(target))
+                    {
+                        return target;
+                    }
+                }
             }
+            return null;
+        }
 
-            if (nameServers.Equals(this.nameServers))
+        /**
+         * Return all endpoints of brokers in route table.
+         */
+        private List<string> AvailableBrokerEndpoints()
+        {
+            List<string> endpoints = new List<string>();
+            foreach (var item in _topicRouteTable)
             {
-                return;
+                foreach (var partition in item.Value.MessageQueues)
+                {
+                    string endpoint = Utilities.TargetUrl(partition);
+                    if (!endpoints.Contains(endpoint))
+                    {
+                        endpoints.Add(endpoint);
+                    }
+                }
             }
-
-            // Name server list is updated. 
-            // TODO: Locking is required
-            this.nameServers = nameServers;
-            this.currentNameServerIndex = 0;
+            return endpoints;
         }
 
-        private async Task updateTopicRoute()
+        private async Task UpdateTopicRoute()
         {
-            if (null == nameServers || 0 == nameServers.Count)
+            HashSet<string> topics = new HashSet<string>();
+            foreach (var topic in topicsOfInterest_)
             {
-                List<string> list = await nameServerResolver.resolveAsync();
-                if (null != list && 0 != list.Count)
-                {
-                    this.nameServers = list;
-                }
-                else
-                {
-                    // TODO: log warning here.
-                    return;
-                }
+                topics.Add(topic);
             }
 
-            // We got one or more name servers available.
-            string nameServer = nameServers[currentNameServerIndex];
+            foreach (var item in _topicRouteTable)
+            {
+                topics.Add(item.Key);
+            }
+            Logger.Debug($"Fetch topic route for {topics.Count} topics");
+
+            // Wrap topics into list such that we can map async result to topic 
+            List<string> topicList = new List<string>();
+            topicList.AddRange(topics);
 
             List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
-            foreach (var item in topicRouteTable)
+            foreach (var item in topicList)
             {
-                tasks.Add(getRouteFor(item.Key, true));
+                tasks.Add(GetRouteFor(item, true));
             }
 
             // Update topic route data
             TopicRouteData[] result = await Task.WhenAll(tasks);
+            var i = 0;
             foreach (var item in result)
             {
                 if (null == item)
                 {
+                    Logger.Warn($"Failed to fetch route for {topicList[i]}, null response");
+                    ++i;
                     continue;
                 }
 
-                if (0 == item.Partitions.Count)
+                if (0 == item.MessageQueues.Count)
                 {
+                    Logger.Warn($"Failed to fetch route for {topicList[i]}, empty message queue");
+                    ++i;
                     continue;
                 }
 
-                var topicName = item.Partitions[0].Topic.Name;
-                var existing = topicRouteTable[topicName];
+                var topicName = item.MessageQueues[0].Topic.Name;
+
+                // Make assertion
+                Debug.Assert(topicName.Equals(topicList[i]));
+
+                var existing = _topicRouteTable[topicName];
                 if (!existing.Equals(item))
                 {
-                    topicRouteTable[topicName] = item;
+                    _topicRouteTable[topicName] = item;
                 }
+                ++i;
             }
         }
 
@@ -154,74 +217,210 @@ namespace org.apache.rocketmq
          * direct
          *    Indicate if we should by-pass cache and fetch route entries from name server.
          */
-        public async Task<TopicRouteData> getRouteFor(string topic, bool direct)
+        public async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
         {
-            if (!direct && topicRouteTable.ContainsKey(topic))
+            if (!direct && _topicRouteTable.ContainsKey(topic))
             {
-                return topicRouteTable[topic];
+                return _topicRouteTable[topic];
             }
 
-            if (null == nameServers || 0 == nameServers.Count)
+            // We got one or more name servers available.
+            var request = new rmq::QueryRouteRequest();
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = AccessPointScheme;
+            foreach (var address in AccessPointEndpoints)
             {
-                List<string> list = await nameServerResolver.resolveAsync();
-                if (null != list && 0 != list.Count)
+                request.Endpoints.Addresses.Add(address);
+            }
+
+            var metadata = new grpc.Metadata();
+            Signature.sign(this, metadata);
+            int index = _random.Next(0, AccessPointEndpoints.Count);
+            var serviceEndpoint = AccessPointEndpoints[index];
+            // AccessPointAddresses.Count
+            string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+            TopicRouteData topicRouteData;
+            try
+            {
+                Logger.Debug($"Resolving route for topic={topic}");
+                topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
+                if (null != topicRouteData)
                 {
-                    this.nameServers = list;
+                    Logger.Debug($"Got route entries for {topic} from name server");
+                    _topicRouteTable.TryAdd(topic, topicRouteData);
+                    return topicRouteData;
                 }
                 else
                 {
-                    // TODO: log warning here.
-                    return null;
+                    Logger.Warn($"Failed to query route of {topic} from {target}");
                 }
             }
+            catch (Exception e)
+            {
+                Logger.Warn(e, "Failed when querying route");
+            }
 
-            // We got one or more name servers available.
-            string nameServer = nameServers[currentNameServerIndex];
-            var request = new rmq::QueryRouteRequest();
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = resourceNamespace_;
-            request.Topic.Name = topic;
-            request.Endpoints = new rmq::Endpoints();
-            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
-            var address = new rmq::Address();
-            int pos = nameServer.LastIndexOf(':');
-            address.Host = nameServer.Substring(0, pos);
-            address.Port = Int32.Parse(nameServer.Substring(pos + 1));
-            request.Endpoints.Addresses.Add(address);
-            var target = string.Format("https://{0}:{1}", address.Host, address.Port);
-            var metadata = new grpc.Metadata();
-            Signature.sign(this, metadata);
-            var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
-            return topicRouteData;
+            return null;
         }
 
-        public abstract void prepareHeartbeatData(rmq::HeartbeatRequest request);
+        protected abstract void PrepareHeartbeatData(rmq::HeartbeatRequest request);
 
-        public void heartbeat()
+        public async Task Heartbeat()
         {
-            List<string> endpoints = endpointsInUse();
+            List<string> endpoints = AvailableBrokerEndpoints();
             if (0 == endpoints.Count)
             {
+                Logger.Debug("No broker endpoints available in topic route");
                 return;
             }
 
-            var heartbeatRequest = new rmq::HeartbeatRequest();
-            prepareHeartbeatData(heartbeatRequest);
+            var request = new rmq::HeartbeatRequest();
+            PrepareHeartbeatData(request);
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
+
+            List<Task> tasks = new List<Task>();
+            foreach (var endpoint in endpoints)
+            {
+                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
+            }
+
+            await Task.WhenAll(tasks);
         }
 
-        public void healthCheck()
+        private List<string> BlockedBrokerEndpoints()
+        {
+            List<string> endpoints = new List<string>();
+            return endpoints;
+        }
+
+        private void RemoveFromBlockList(string endpoint)
         {
 
         }
 
-        public async Task<bool> notifyClientTermination()
+        protected async Task<List<rmq::Assignment>> scanLoadAssignment(string topic, string group)
         {
-            List<string> endpoints = endpointsInUse();
+            // Pick a broker randomly
+            string target = FilterBroker((s) => true);
+            var request = new rmq::QueryAssignmentRequest();
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = AccessPointScheme;
+            foreach (var endpoint in AccessPointEndpoints)
+            {
+                request.Endpoints.Addresses.Add(endpoint);
+            }
+            try
+            {
+                var metadata = new grpc::Metadata();
+                Signature.sign(this, metadata);
+                return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
+            }
+            catch (System.Exception e)
+            {
+                Logger.Warn(e, $"Failed to acquire load assignments from {target}");
+            }
+            // Just return an empty list.
+            return new List<rmq.Assignment>();
+        }
+
+        private string TargetUrl(rmq::Assignment assignment)
+        {
+            var broker = assignment.MessageQueue.Broker;
+            var addresses = broker.Endpoints.Addresses;
+            // TODO: use the first address for now. 
+            var address = addresses[0];
+            return $"https://{address.Host}:{address.Port}";
+        }
+
+        public virtual void BuildClientSetting(rmq::Settings settings)
+        {
+            settings.MergeFrom(_clientSettings);
+        }
+
+        public void createSession(string url)
+        {
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            var stream = Manager.Telemetry(url, metadata);
+            var session = new Session(url, stream, this);
+            _sessions.TryAdd(url, session);
+            Task.Run(async () =>
+            {
+                await session.Loop();
+            });
+        }
+
+
+        public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
+        {
+            var targetUrl = TargetUrl(assignment);
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            var request = new rmq::ReceiveMessageRequest();
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+            request.MessageQueue = assignment.MessageQueue;
+            var messages = await Manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
+            return messages;
+        }
+
+        public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
+        {
+            var request = new rmq::AckMessageRequest();
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+
+            var entry = new rmq::AckMessageEntry();
+            entry.ReceiptHandle = receiptHandle;
+            entry.MessageId = messageId;
+            request.Entries.Add(entry);
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            return await Manager.Ack(target, metadata, request, RequestTimeout);
+        }
+
+        public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
+        {
+            var request = new rmq::ChangeInvisibleDurationRequest();
+            request.ReceiptHandle = receiptHandle;
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+
+            request.MessageId = messageId;
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
+        }
+
+        public async Task<bool> NotifyClientTermination()
+        {
+            List<string> endpoints = AvailableBrokerEndpoints();
             var request = new rmq::NotifyClientTerminationRequest();
-            request.ClientId = clientId();
+
 
             var metadata = new grpc.Metadata();
             Signature.sign(this, metadata);
@@ -230,7 +429,7 @@ namespace org.apache.rocketmq
 
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(clientManager.notifyClientTermination(endpoint, metadata, request, getIoTimeout()));
+                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
             }
 
             bool[] results = await Task.WhenAll(tasks);
@@ -244,19 +443,53 @@ namespace org.apache.rocketmq
             return true;
         }
 
-        private List<string> endpointsInUse()
+        public virtual void OnSettingsReceived(rmq::Settings settings)
         {
-            //TODO: gather endpoints from route entries.
-            return new List<string>();
+            if (null != settings.Metric)
+            {
+                _clientSettings.Metric = new rmq::Metric();
+                _clientSettings.Metric.MergeFrom(settings.Metric);
+            }
+
+            if (null != settings.BackoffPolicy)
+            {
+                _clientSettings.BackoffPolicy = new rmq::RetryPolicy();
+                _clientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
+            }
+        }
+
+        protected readonly IClientManager Manager;
+
+        private readonly HashSet<string> topicsOfInterest_ = new HashSet<string>();
+
+        public void AddTopicOfInterest(string topic)
+        {
+            topicsOfInterest_.Add(topic);
         }
 
-        protected IClientManager clientManager;
-        private INameServerResolver nameServerResolver;
-        private CancellationTokenSource nameServerResolverCTS;
-        private List<string> nameServers;
-        private int currentNameServerIndex;
+        private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
+        private readonly CancellationTokenSource _updateTopicRouteCts;
+
+        private readonly CancellationTokenSource _healthCheckCts;
+
+        private readonly CancellationTokenSource telemetryCts_ = new CancellationTokenSource();
+
+        public CancellationTokenSource TelemetryCts()
+        {
+            return telemetryCts_;
+        }
+
+        protected readonly AccessPoint _accessPoint;
+
+        // This field is subject changes from servers.
+        protected readonly rmq::Settings _clientSettings;
+
+        private readonly Random _random = new Random();
+        
+        protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
 
-        private ConcurrentDictionary<string, TopicRouteData> topicRouteTable;
-        private CancellationTokenSource updateTopicRouteCTS;
+        public static readonly string MeterName = "Apache.RocketMQ.Client";
+        
+        protected static readonly Meter MetricMeter = new(MeterName, "1.0");
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs
index 949f8b4..0d99cb1 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -15,101 +15,134 @@
  * limitations under the License.
  */
 using System;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
-    public class ClientConfig : IClientConfig {
+    public class ClientConfig : IClientConfig
+    {
 
-        public ClientConfig() {
+        public ClientConfig()
+        {
             var hostName = System.Net.Dns.GetHostName();
             var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
             this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
-            this.ioTimeout_ = TimeSpan.FromSeconds(3);
-            this.longPollingIoTimeout_ = TimeSpan.FromSeconds(15);
+            this._requestTimeout = TimeSpan.FromSeconds(3);
+            this.longPollingIoTimeout_ = TimeSpan.FromSeconds(30);
+            this.client_type_ = rmq::ClientType.Unspecified;
+            this.access_point_ = new rmq::Endpoints();
+            this.back_off_policy_ = new rmq::RetryPolicy();
+            this._publishing = new Publishing();
         }
 
-        public string region() {
-            return region_;
+        public string region()
+        {
+            return _region;
         }
-        public string Region {
-            set { region_ = value; }
+        public string Region
+        {
+            set { _region = value; }
         }
 
-        public string serviceName() {
-            return serviceName_;
+        public string serviceName()
+        {
+            return _serviceName;
         }
-        public string ServiceName {
-            set { serviceName_ = value; }
+        public string ServiceName
+        {
+            set { _serviceName = value; }
         }
 
-        public string resourceNamespace() {
-            return resourceNamespace_;
+        public string resourceNamespace()
+        {
+            return _resourceNamespace;
         }
-        public string ResourceNamespace {
-            set { resourceNamespace_ = value; }
+        public string ResourceNamespace
+        {
+            get { return _resourceNamespace; }
+            set { _resourceNamespace = value; }
         }
 
-        public ICredentialsProvider credentialsProvider() {
+        public ICredentialsProvider credentialsProvider()
+        {
             return credentialsProvider_;
         }
-        
-        public ICredentialsProvider CredentialsProvider {
+
+        public ICredentialsProvider CredentialsProvider
+        {
             set { credentialsProvider_ = value; }
         }
 
-        public string tenantId() {
-            return tenantId_;
+        public string tenantId()
+        {
+            return _tenantId;
         }
-        public string TenantId {
-            set { tenantId_ = value; }
+        public string TenantId
+        {
+            set { _tenantId = value; }
         }
 
-        public TimeSpan getIoTimeout() {
-            return ioTimeout_;
-        }
-        public TimeSpan IoTimeout {
-            set { ioTimeout_ = value; }
+        public TimeSpan RequestTimeout
+        {
+            get
+            {
+                return _requestTimeout;
+            }
+            set
+            {
+                _requestTimeout = value;
+            }
         }
 
-        public TimeSpan getLongPollingTimeout() {
+        public TimeSpan getLongPollingTimeout()
+        {
             return longPollingIoTimeout_;
         }
-        public TimeSpan LongPollingTimeout {
+        public TimeSpan LongPollingTimeout
+        {
             set { longPollingIoTimeout_ = value; }
         }
 
-        public string getGroupName() {
+        public string getGroupName()
+        {
             return groupName_;
         }
-        public string GroupName {
+        public string GroupName
+        {
             set { groupName_ = value; }
         }
 
-        public string clientId() {
+        public string clientId()
+        {
             return clientId_;
         }
 
-        public bool isTracingEnabled() {
+        public bool isTracingEnabled()
+        {
             return tracingEnabled_;
         }
-        public bool TracingEnabled {
+        public bool TracingEnabled
+        {
             set { tracingEnabled_ = value; }
         }
 
-        public void setInstanceName(string instanceName) {
+        public void setInstanceName(string instanceName)
+        {
             this.instanceName_ = instanceName;
         }
 
-        private string region_ = "cn-hangzhou";
-        private string serviceName_ = "ONS";
+        private string _region = "cn-hangzhou";
+        private string _serviceName = "ONS";
 
-        protected string resourceNamespace_;
+        protected string _resourceNamespace;
 
         private ICredentialsProvider credentialsProvider_;
 
-        private string tenantId_;
+        private string _tenantId;
 
-        private TimeSpan ioTimeout_;
+        private TimeSpan _requestTimeout;
 
         private TimeSpan longPollingIoTimeout_;
 
@@ -120,6 +153,53 @@ namespace org.apache.rocketmq {
         private bool tracingEnabled_ = false;
 
         private string instanceName_ = "default";
+
+        private rmq::ClientType client_type_;
+        public rmq::ClientType ClientType
+        {
+            get { return client_type_; }
+            set { client_type_ = value; }
+        }
+
+
+        private rmq::Endpoints access_point_;
+
+        public rmq::AddressScheme AccessPointScheme
+        {
+            get { return access_point_.Scheme; }
+            set { access_point_.Scheme = value; }
+        }
+
+        public List<rmq::Address> AccessPointEndpoints
+        {
+            get
+            {
+                List<rmq::Address> addresses = new List<rmq::Address>();
+                foreach (var item in access_point_.Addresses)
+                {
+                    addresses.Add(item);
+                }
+                return addresses;
+            }
+
+            set
+            {
+                access_point_.Addresses.Clear();
+                foreach (var item in value)
+                {
+                    access_point_.Addresses.Add(item);
+                }
+            }
+        }
+
+        private rmq::RetryPolicy back_off_policy_;
+
+        private Publishing _publishing;
+        public Publishing Publishing
+        {
+            get { return _publishing; }
+        }
+
     }
 
 }
diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
index 59ec0f2..01adddc 100644
--- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
+++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -18,11 +18,15 @@ using System;
 using System.Threading.Tasks;
 using Grpc.Core;
 using Grpc.Core.Interceptors;
+using NLog;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public class ClientLoggerInterceptor : Interceptor
     {
+
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
         public override TResponse BlockingUnaryCall<TRequest, TResponse>(
             TRequest request,
             ClientInterceptorContext<TRequest, TResponse> context,
@@ -52,19 +56,12 @@ namespace org.apache.rocketmq
             try
             {
                 var response = await t;
-                Console.WriteLine($"Response received: {response}");
+                Logger.Debug($"Response received: {response}");
                 return response;
             }
             catch (Exception ex)
             {
-                // Log error to the console.
-                // Note: Configuring .NET Core logging is the recommended way to log errors
-                // https://docs.microsoft.com/aspnet/core/grpc/diagnostics#grpc-client-logging
-                var initialColor = Console.ForegroundColor;
-                Console.ForegroundColor = ConsoleColor.Red;
-                Console.WriteLine($"Call error: {ex.Message}");
-                Console.ForegroundColor = initialColor;
-
+                Logger.Error($"Call error: {ex.Message}");
                 throw;
             }
         }
@@ -104,10 +101,7 @@ namespace org.apache.rocketmq
             where TRequest : class
             where TResponse : class
         {
-            var initialColor = Console.ForegroundColor;
-            Console.ForegroundColor = ConsoleColor.Green;
-            Console.WriteLine($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
-            Console.ForegroundColor = initialColor;
+            Logger.Debug($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
         }
 
         private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index 59fec83..a39a0e0 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -15,168 +15,310 @@
  * limitations under the License.
  */
 
-using System.Collections.Concurrent;
-
-using rmq = global::apache.rocketmq.v1;
-using Grpc.Net.Client;
+using rmq = Apache.Rocketmq.V2;
 using System;
+using System.IO;
+using System.IO.Compression;
 using System.Threading;
 using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using System.Collections.Generic;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
+using System.Security.Cryptography;
+using NLog;
 
-namespace org.apache.rocketmq {
-    public class ClientManager : IClientManager {
+namespace Org.Apache.Rocketmq
+{
+    public class ClientManager : IClientManager
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        public ClientManager() {
-            rpcClients = new ConcurrentDictionary<string, RpcClient>();
+        public ClientManager()
+        {
+            _rpcClients = new Dictionary<string, RpcClient>();
+            _clientLock = new ReaderWriterLockSlim();
         }
 
-        public IRpcClient getRpcClient(string target) {
-            if (!rpcClients.ContainsKey(target)) {
-                var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions {
-                    HttpHandler = createHttpHandler()
-                });
-                var invoker = channel.Intercept(new ClientLoggerInterceptor());
-                var client = new rmq::MessagingService.MessagingServiceClient(invoker);
-                var rpcClient = new RpcClient(client);
-                if(rpcClients.TryAdd(target, rpcClient)) {
-                    return rpcClient;
+        public IRpcClient GetRpcClient(string target)
+        {
+            _clientLock.EnterReadLock();
+            try
+            {
+                // client exists, return in advance.
+                if (_rpcClients.ContainsKey(target))
+                {
+                    return _rpcClients[target];
                 }
             }
-            return rpcClients[target];
+            finally
+            {
+                _clientLock.ExitReadLock();
+            }
+
+            _clientLock.EnterWriteLock();
+            try
+            {
+                // client exists, return in advance.
+                if (_rpcClients.ContainsKey(target))
+                {
+                    return _rpcClients[target];
+                }
+
+                // client does not exist, generate a new one
+                var client = new RpcClient(target);
+                _rpcClients.Add(target, client);
+                return client;
+            }
+            finally
+            {
+                _clientLock.ExitWriteLock();
+            }
         }
 
-        /**
-         * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
-         * why parameters are configured this way.
-         */
-        public static HttpMessageHandler createHttpHandler()
+        public grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata)
         {
-            var sslOptions = new System.Net.Security.SslClientAuthenticationOptions();
-            // Disable server certificate validation during development phase.
-            // Comment out the following line if server certificate validation is required. 
-            sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
-            var handler = new SocketsHttpHandler
-            {
-                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
-                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
-                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
-                EnableMultipleHttp2Connections = true,
-                SslOptions = sslOptions,
-            };
-            return handler;
+            var rpcClient = GetRpcClient(target);
+            return rpcClient.Telemetry(metadata);
         }
 
-        public async Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
+        public async Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata,
+            rmq::QueryRouteRequest request, TimeSpan timeout)
         {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var queryRouteResponse = await rpcClient.queryRoute(request, callOptions);
+            var rpcClient = GetRpcClient(target);
+            Logger.Debug($"QueryRouteRequest: {request}");
+            var queryRouteResponse = await rpcClient.QueryRoute(metadata, request, timeout);
 
-            if (queryRouteResponse.Common.Status.Code != ((int)Google.Rpc.Code.Ok)) {
+            if (queryRouteResponse.Status.Code != rmq::Code.Ok)
+            {
+                Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status}");
                 // Raise an application layer exception
+            }
+            Logger.Debug($"QueryRouteResponse: {queryRouteResponse}");
 
+            var messageQueues = new List<rmq::MessageQueue>();
+            foreach (var messageQueue in queryRouteResponse.MessageQueues)
+            {
+                messageQueues.Add(messageQueue);
             }
+            var topicRouteData = new TopicRouteData(messageQueues);
+            return topicRouteData;
+        }
 
-            var partitions = new List<Partition>();
-            // Translate protobuf object to domain specific one
-            foreach (var partition in queryRouteResponse.Partitions)
+        public async Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request,
+            TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            Logger.Debug($"Heartbeat to {target}, Request: {request}");
+            var response = await rpcClient.Heartbeat(metadata, request, timeout);
+            Logger.Debug($"Heartbeat to {target} response status: {response.Status}");
+            return response.Status.Code == rmq::Code.Ok;
+        }
+
+        public async Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata,
+            rmq::SendMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.SendMessage(metadata, request, timeout);
+            return response;
+        }
+
+        public async Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata,
+            rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            rmq::NotifyClientTerminationResponse response =
+                await rpcClient.NotifyClientTermination(metadata, request, timeout);
+            return response.Status.Code == rmq::Code.Ok;
+        }
+
+        public async Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            rmq::QueryAssignmentResponse response = await rpcClient.QueryAssignment(metadata, request, timeout);
+            if (response.Status.Code != rmq::Code.Ok)
             {
-                var topic = new Topic(partition.Topic.ResourceNamespace, partition.Topic.Name);
-                var id = partition.Id;
-                Permission permission = Permission.READ_WRITE;
-                switch (partition.Permission) {
-                    case rmq::Permission.None:
+                // TODO: Build exception hierarchy
+                throw new Exception($"Failed to query load assignment from server. Cause: {response.Status.Message}");
+            }
+
+            List<rmq::Assignment> assignments = new List<rmq.Assignment>();
+            foreach (var item in response.Assignments)
+            {
+                assignments.Add(item);
+            }
+            return assignments;
+        }
+
+        public async Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, 
+            rmq::ReceiveMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            List<rmq::ReceiveMessageResponse> response = await rpcClient.ReceiveMessage(metadata, request, timeout);
+
+            if (null == response || 0 == response.Count)
+            {
+                // TODO: throw an exception to propagate this error?
+                return new List<Message>();
+            }
+
+            List<Message> messages = new List<Message>();
+            
+            foreach (var entry in response)
+            {
+                switch (entry.ContentCase)
+                {
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.None:
                     {
-                        permission = Permission.NONE;
+                        Logger.Warn("Unexpected ReceiveMessageResponse content type");
                         break;
                     }
-                    case rmq::Permission.Read:
+                    
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.Status:
                     {
-                        permission = Permission.READ;
+                        switch (entry.Status.Code)
+                        {
+                            case rmq.Code.Ok:
+                            {
+                                break;
+                            }
+
+                            case rmq.Code.Forbidden:
+                            {
+                                Logger.Warn("Receive message denied");
+                                break;
+                            }
+                            case rmq.Code.TooManyRequests:
+                            {
+                                Logger.Warn("TooManyRequest: servers throttled");
+                                break;
+                            }
+                            default:
+                            {
+                                Logger.Warn("Unknown error status");
+                                break;
+                            }
+                        }
                         break;
                     }
-                    case rmq::Permission.Write:
+
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.Message:
                     {
-                        permission = Permission.WRITE;
+                        var message = Convert(target, entry.Message);
+                        messages.Add(message);
                         break;
                     }
-                    case rmq::Permission.ReadWrite:
+
+                    case rmq.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp:
                     {
-                        permission = Permission.READ_WRITE;
+                        var begin = entry.DeliveryTimestamp;
+                        var costs = DateTime.UtcNow - begin.ToDateTime();
+                        // TODO: Collect metrics
                         break;
                     }
                 }
+            }
+            return messages;
+        }
 
-                AddressScheme scheme = AddressScheme.IPv4;
-                switch(partition.Broker.Endpoints.Scheme) {
-                    case rmq::AddressScheme.Ipv4:
-                        {
-                        scheme = AddressScheme.IPv4;
-                        break;
-                    }
-                    case rmq::AddressScheme.Ipv6:
-                        {
-                        scheme = AddressScheme.IPv6;
-                        break;
-                    }
-                    case rmq::AddressScheme.DomainName:
-                        {
-                        scheme = AddressScheme.DOMAIN_NAME;
-                        break;
-                    }
-                }
+        private Message Convert(string sourceHost, rmq::Message message)
+        {
+            var msg = new Message();
+            msg.Topic = message.Topic.Name;
+            msg.MessageId = message.SystemProperties.MessageId;
+            msg.Tag = message.SystemProperties.Tag;
 
-                List<Address> addresses = new List<Address>();
-                foreach(var item in partition.Broker.Endpoints.Addresses) {
-                    addresses.Add(new Address(item.Host, item.Port));
+            // Validate message body checksum
+            byte[] raw = message.Body.ToByteArray();
+            if (rmq::DigestType.Crc32 == message.SystemProperties.BodyDigest.Type)
+            {
+                uint checksum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length);
+                if (!message.SystemProperties.BodyDigest.Checksum.Equals(checksum.ToString("X")))
+                {
+                    msg._checksumVerifiedOk = false;
+                }
+            }
+            else if (rmq::DigestType.Md5 == message.SystemProperties.BodyDigest.Type)
+            {
+                var checksum = MD5.HashData(raw);
+                if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
+                {
+                    msg._checksumVerifiedOk = false;
+                }
+            }
+            else if (rmq::DigestType.Sha1 == message.SystemProperties.BodyDigest.Type)
+            {
+                var checksum = SHA1.HashData(raw);
+                if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
+                {
+                    msg._checksumVerifiedOk = false;
                 }
-                ServiceAddress serviceAddress = new ServiceAddress(scheme, addresses);
-                Broker broker = new Broker(partition.Broker.Name, id, serviceAddress);
-                partitions.Add(new Partition(topic, broker, id, permission));
             }
 
-            var topicRouteData = new TopicRouteData(partitions);
-            return topicRouteData;
-        }
+            foreach (var entry in message.UserProperties)
+            {
+                msg.UserProperties.Add(entry.Key, entry.Value);
+            }
 
-        public async Task<Boolean> heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
-        {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc.CallOptions(metadata, deadline);
-            var response = await rpcClient.heartbeat(request, callOptions);
-            if (null == response)
+            msg._receiptHandle = message.SystemProperties.ReceiptHandle;
+            msg._sourceHost = sourceHost;
+
+            foreach (var key in message.SystemProperties.Keys)
             {
-                return false;
+                msg.Keys.Add(key);
             }
 
-            return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+            msg.DeliveryAttempt = message.SystemProperties.DeliveryAttempt;
+
+            if (message.SystemProperties.BodyEncoding == rmq::Encoding.Gzip)
+            {
+                // Decompress/Inflate message body
+                var inputStream = new MemoryStream(message.Body.ToByteArray());
+                var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress);
+                var outputStream = new MemoryStream();
+                gzipStream.CopyTo(outputStream);
+                msg.Body = outputStream.ToArray();
+            }
+            else
+            {
+                msg.Body = message.Body.ToByteArray();
+            }
+
+            return msg;
         }
 
-        public async Task<rmq::SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout)
+        public async Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout)
         {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = await rpcClient.sendMessage(request, callOptions);
-            return response;
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.AckMessage(metadata, request, timeout);
+            return response.Status.Code == rmq::Code.Ok;
         }
 
-        public async Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+        public async Task<Boolean> ChangeInvisibleDuration(string target, grpc::Metadata metadata, rmq::ChangeInvisibleDurationRequest request, TimeSpan timeout)
         {
-            var rpcClient = getRpcClient(target);
-            var deadline = DateTime.UtcNow.Add(timeout);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            rmq::NotifyClientTerminationResponse response = await rpcClient.notifyClientTermination(request, callOptions);
-            return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.ChangeInvisibleDuration(metadata, request, timeout);
+            return response.Status.Code == rmq::Code.Ok;
         }
 
-        private ConcurrentDictionary<string, RpcClient> rpcClients;
+        public async Task Shutdown()
+        {
+            _clientLock.EnterReadLock();
+            try
+            {
+                List<Task> tasks = new List<Task>();
+                foreach (var item in _rpcClients)
+                {
+                    tasks.Add(item.Value.Shutdown());
+                }
+
+                await Task.WhenAll(tasks);
+            }
+            finally
+            {
+                _clientLock.ExitReadLock();
+            }
+        }
 
+        private readonly Dictionary<string, RpcClient> _rpcClients;
+        private readonly ReaderWriterLockSlim _clientLock;
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs b/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
index 3ca211d..9d03994 100644
--- a/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
+++ b/csharp/rocketmq-client-csharp/ClientManagerFactory.cs
@@ -18,7 +18,7 @@ using System;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public sealed class ClientManagerFactory
     {
diff --git a/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
index 1381b3f..39dfd7e 100644
--- a/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
+++ b/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -19,36 +19,46 @@ using System;
 using System.Text.Json;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     /**
      * File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config
      * A sample config content is as follows:
      * {"AccessKey": "key", "AccessSecret": "secret"}
      */
-    public class ConfigFileCredentialsProvider : ICredentialsProvider {
+    public class ConfigFileCredentialsProvider : ICredentialsProvider
+    {
 
-        public ConfigFileCredentialsProvider() {
+        public ConfigFileCredentialsProvider()
+        {
             var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
             string configFileRelativePath = "/.rocketmq/config";
-            if (!File.Exists(home + configFileRelativePath)) {
+            if (!File.Exists(home + configFileRelativePath))
+            {
                 return;
             }
 
-            try {
-                using (var reader = new StreamReader(home + configFileRelativePath)) {
+            try
+            {
+                using (var reader = new StreamReader(home + configFileRelativePath))
+                {
                     string json = reader.ReadToEnd();
                     var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json);
                     accessKey = kv["AccessKey"];
                     accessSecret = kv["AccessSecret"];
                     valid = true;
-                } 
-            } catch (IOException e) {
+                }
+            }
+            catch (IOException)
+            {
             }
         }
 
-        public Credentials getCredentials() {
-            if (!valid) {
+        public Credentials getCredentials()
+        {
+            if (!valid)
+            {
                 return null;
             }
 
diff --git a/csharp/rocketmq-client-csharp/Credentials.cs b/csharp/rocketmq-client-csharp/Credentials.cs
index 2da9581..a73b000 100644
--- a/csharp/rocketmq-client-csharp/Credentials.cs
+++ b/csharp/rocketmq-client-csharp/Credentials.cs
@@ -17,27 +17,34 @@
 
 using System;
 
-namespace org.apache.rocketmq {
-    public class Credentials {
+namespace Org.Apache.Rocketmq
+{
+    public class Credentials
+    {
 
-        public Credentials(string accessKey, string accessSecret) {
+        public Credentials(string accessKey, string accessSecret)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
         }
 
-        public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant) {
+        public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
             this.sessionToken = sessionToken;
             this.expirationInstant = expirationInstant;
         }
 
-        public bool empty() {
+        public bool empty()
+        {
             return String.IsNullOrEmpty(accessKey) || String.IsNullOrEmpty(accessSecret);
         }
 
-        public bool expired() {
-            if (DateTime.MinValue == expirationInstant) {
+        public bool expired()
+        {
+            if (DateTime.MinValue == expirationInstant)
+            {
                 return false;
             }
 
@@ -45,17 +52,20 @@ namespace org.apache.rocketmq {
         }
 
         private string accessKey;
-        public string AccessKey {
+        public string AccessKey
+        {
             get { return accessKey; }
         }
-        
+
         private string accessSecret;
-        public string AccessSecret {
+        public string AccessSecret
+        {
             get { return accessSecret; }
         }
 
         private string sessionToken;
-        public string SessionToken {
+        public string SessionToken
+        {
             get { return sessionToken; }
         }
 
diff --git a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs b/csharp/rocketmq-client-csharp/ExpressionType.cs
similarity index 87%
copy from csharp/rocketmq-client-csharp/ICredentialsProvider.cs
copy to csharp/rocketmq-client-csharp/ExpressionType.cs
index 6e7112e..0caaf8e 100644
--- a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs
+++ b/csharp/rocketmq-client-csharp/ExpressionType.cs
@@ -14,8 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace org.apache.rocketmq {
-    public interface ICredentialsProvider {
-        Credentials getCredentials();
+namespace Org.Apache.Rocketmq
+{
+
+    public enum ExpressionType
+    {
+        TAG,
+        SQL92,
     }
+
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/FilterExpression.cs
similarity index 74%
copy from csharp/rocketmq-client-csharp/TopicRouteException.cs
copy to csharp/rocketmq-client-csharp/FilterExpression.cs
index b520e72..3bd432d 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteException.cs
+++ b/csharp/rocketmq-client-csharp/FilterExpression.cs
@@ -14,15 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-namespace org.apache.rocketmq
+
+namespace Org.Apache.Rocketmq
 {
-    public class TopicRouteException : Exception
+    public class FilterExpression
     {
-        public TopicRouteException(string message) : base(message)
+        public FilterExpression(string expression, ExpressionType type)
         {
-
+            Expression = expression;
+            Type = type;
         }
 
+        public ExpressionType Type { get; }
+        public string Expression { get; }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs
index 7f3ed64..3352028 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -16,17 +16,24 @@
  */
 
 using System.Threading.Tasks;
+using System.Threading;
+using System;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public interface IClient : IClientConfig
     {
 
-        void heartbeat();
+        Task Heartbeat();
 
-        void healthCheck();
+        Task<bool> NotifyClientTermination();
 
-        Task<bool> notifyClientTermination();
+        void BuildClientSetting(rmq::Settings settings);
 
+
+        void OnSettingsReceived(rmq::Settings settings);
+
+        CancellationTokenSource TelemetryCts();
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs b/csharp/rocketmq-client-csharp/IClientConfig.cs
index b83006c..438d7a8 100644
--- a/csharp/rocketmq-client-csharp/IClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/IClientConfig.cs
@@ -16,8 +16,10 @@
  */
 using System;
 
-namespace org.apache.rocketmq {
-    public interface IClientConfig {
+namespace Org.Apache.Rocketmq
+{
+    public interface IClientConfig
+    {
         string region();
 
         string serviceName();
@@ -28,8 +30,6 @@ namespace org.apache.rocketmq {
 
         string tenantId();
 
-        TimeSpan getIoTimeout();
-
         TimeSpan getLongPollingTimeout();
 
         string getGroupName();
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index 08ed86a..afccfde 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -15,22 +15,37 @@
  * limitations under the License.
  */
 
-using apache.rocketmq.v1;
 using System.Threading.Tasks;
 using System;
+using System.Collections.Generic;
 using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
-    public interface IClientManager {
-        IRpcClient getRpcClient(string target);
 
-        Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
+namespace Org.Apache.Rocketmq
+{
+    public interface IClientManager
+    {
+        IRpcClient GetRpcClient(string target);
 
-        Task<Boolean> heartbeat(string target, grpc::Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+        grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata);
 
-        Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout);
+        Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout);
 
-        Task<SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+        Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout);
 
+        Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout);
+
+        Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout);
+
+        Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout);
+
+        Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, rmq::ReceiveMessageRequest request, TimeSpan timeout);
+
+        Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout);
+
+        Task<Boolean> ChangeInvisibleDuration(string target, grpc::Metadata metadata, rmq::ChangeInvisibleDurationRequest request, TimeSpan timeout);
+
+        Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageType.cs b/csharp/rocketmq-client-csharp/IConsumer.cs
similarity index 84%
copy from csharp/rocketmq-client-csharp/MessageType.cs
copy to csharp/rocketmq-client-csharp/IConsumer.cs
index 376b658..2ad0dab 100644
--- a/csharp/rocketmq-client-csharp/MessageType.cs
+++ b/csharp/rocketmq-client-csharp/IConsumer.cs
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
 
-namespace org.apache.rocketmq
+using System.Threading.Tasks;
+namespace Org.Apache.Rocketmq
 {
-    public enum MessageType {
-        Normal,
-        Fifo,
-        Delay,
-        Transaction,
+    public interface IConsumer
+    {
+        Task Start();
+
+        Task Shutdown();
     }
-    
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs b/csharp/rocketmq-client-csharp/ICredentialsProvider.cs
index 6e7112e..1fb892b 100644
--- a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs
+++ b/csharp/rocketmq-client-csharp/ICredentialsProvider.cs
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace org.apache.rocketmq {
-    public interface ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+    public interface ICredentialsProvider
+    {
         Credentials getCredentials();
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IMessageListener.cs
similarity index 82%
copy from csharp/rocketmq-client-csharp/IClient.cs
copy to csharp/rocketmq-client-csharp/IMessageListener.cs
index 7f3ed64..f46efd5 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IMessageListener.cs
@@ -15,18 +15,16 @@
  * limitations under the License.
  */
 
+using System.Collections.Generic;
 using System.Threading.Tasks;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public interface IClient : IClientConfig
-    {
-
-        void heartbeat();
-
-        void healthCheck();
 
-        Task<bool> notifyClientTermination();
+    public interface IMessageListener
+    {
+        Task Consume(List<Message> messages, List<Message> failed);
 
     }
+
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IProducer.cs b/csharp/rocketmq-client-csharp/IProducer.cs
index 89f8955..420af20 100644
--- a/csharp/rocketmq-client-csharp/IProducer.cs
+++ b/csharp/rocketmq-client-csharp/IProducer.cs
@@ -14,16 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
+
 using System.Threading.Tasks;
 
-namespace org.apache.rocketmq {
-    public interface IProducer {
-        void start();
+namespace Org.Apache.Rocketmq
+{
+    public interface IProducer
+    {
+        Task Start();
 
-        void shutdown();
+        Task Shutdown();
 
-        Task<SendResult> send(Message message);
-        
+        Task<SendReceipt> Send(Message message);
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs b/csharp/rocketmq-client-csharp/IRpcClient.cs
index 0590bb0..146d4f7 100644
--- a/csharp/rocketmq-client-csharp/IRpcClient.cs
+++ b/csharp/rocketmq-client-csharp/IRpcClient.cs
@@ -14,21 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
+using System.Collections.Generic;
 using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public interface IRpcClient
     {
-        Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
+        AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Metadata metadata);
+
+        Task<QueryRouteResponse> QueryRoute(Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
+
+        Task<HeartbeatResponse> Heartbeat(Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+
+        Task<SendMessageResponse> SendMessage(Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+
+        Task<QueryAssignmentResponse> QueryAssignment(Metadata metadata, QueryAssignmentRequest request,
+            TimeSpan timeout);
+
+        Task<List<ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, ReceiveMessageRequest request, TimeSpan timeout);
+
+        Task<AckMessageResponse> AckMessage(Metadata metadata, AckMessageRequest request, TimeSpan timeout);
+
+        Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, ChangeInvisibleDurationRequest request, TimeSpan timeout);
+
+        Task<ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Metadata metadata,
+            ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout);
+
+        Task<EndTransactionResponse> EndTransaction(Metadata metadata, EndTransactionRequest request, TimeSpan timeout);
 
-        Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions);
 
-        Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions);
+        Task<NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
+            NotifyClientTerminationRequest request, TimeSpan timeout);
 
-        Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions);
+        Task Shutdown();
     }
-}
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Message.cs b/csharp/rocketmq-client-csharp/Message.cs
index 1e6ee0e..b527311 100644
--- a/csharp/rocketmq-client-csharp/Message.cs
+++ b/csharp/rocketmq-client-csharp/Message.cs
@@ -14,79 +14,111 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System;
 using System.Collections.Generic;
-namespace org.apache.rocketmq
+
+namespace Org.Apache.Rocketmq
 {
 
-    public class Message {
-        public Message() : this(null, null) {
+    public class Message
+    {
+        public Message() : this(null, null)
+        {
         }
 
-        public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {}
+        public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { }
 
-        public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) {
+        public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body)
+        {
         }
 
-        public Message(string topic, string tag, List<string> keys, byte[] body) {
-            this.messageId = SequenceGenerator.Instance.Next();
-            this.maxAttemptTimes = 3;
-            this.topic = topic;
-            this.tag = tag;
-            this.keys = keys;
-            this.body = body;
-            this.userProperties = new Dictionary<string, string>();
-            this.systemProperties = new Dictionary<string, string>();
+        public Message(string topic, string tag, List<string> keys, byte[] body)
+        {
+            MessageId = SequenceGenerator.Instance.Next();
+            MaxAttemptTimes = 3;
+            Topic = topic;
+            Tag = tag;
+            Keys = keys;
+            Body = body;
+            UserProperties = new Dictionary<string, string>();
+            DeliveryTimestamp = DateTime.MinValue;
         }
 
-        private string messageId;
         public string MessageId
         {
-            get { return messageId; }
+            get;
+            internal set;
+        }
+        
+        public string Topic
+        {
+            get;
+            set;
         }
 
-        private string topic;
-
-        public string Topic {
-            get { return topic; }
-            set { this.topic = value; }
+        public byte[] Body
+        {
+            get;
+            set;
         }
 
-        private byte[] body;
-        public byte[] Body {
-            get { return body; }
-            set { this.body = value; }
+        public string Tag
+        {
+            get;
+            set;
         }
 
-        private string tag;
-        public string Tag {
-            get { return tag; }
-            set { this.tag = value; }
+        public List<string> Keys
+        {
+            get;
+            set;
         }
 
-        private List<string> keys;
-        public List<string> Keys{
-            get { return keys; }
-            set { this.keys = value; }
+        public Dictionary<string, string> UserProperties
+        {
+            get;
+            set;
         }
 
-        private Dictionary<string, string> userProperties;
-        public Dictionary<string, string> UserProperties {
-            get { return userProperties; }
-            set { this.userProperties = value; }
+        public int MaxAttemptTimes
+        {
+            get;
+            set;
         }
 
-        private Dictionary<string, string> systemProperties;
-        internal Dictionary<string, string> SystemProperties {
-            get { return systemProperties; }
-            set { this.systemProperties = value; }
+
+        public DateTime DeliveryTimestamp
+        {
+            get;
+            set;
+        }
+        
+        public int DeliveryAttempt
+        {
+            get;
+            internal set;
+        }
+        
+        public string MessageGroup
+        {
+            get;
+            set;
+        }
+        
+        public bool Fifo()
+        {
+            return !String.IsNullOrEmpty(MessageGroup);
         }
 
-        private int maxAttemptTimes;
-        public int MaxAttemptTimes
+        public bool Scheduled()
         {
-            get { return maxAttemptTimes; }
-            set { maxAttemptTimes = value; }
+            return DeliveryTimestamp > DateTime.UtcNow;
         }
+
+        internal bool _checksumVerifiedOk = true;
+        internal string _receiptHandle;
+        internal string _sourceHost;
     }
 
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/MessageException.cs
similarity index 84%
copy from csharp/rocketmq-client-csharp/TopicRouteException.cs
copy to csharp/rocketmq-client-csharp/MessageException.cs
index b520e72..7ef10df 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteException.cs
+++ b/csharp/rocketmq-client-csharp/MessageException.cs
@@ -14,15 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
-namespace org.apache.rocketmq
+
+namespace Org.Apache.Rocketmq
 {
-    public class TopicRouteException : Exception
+    [Serializable]
+    public class MessageException : Exception
     {
-        public TopicRouteException(string message) : base(message)
+        public MessageException(string message) : base(message)
         {
-
         }
-
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageIdGenerator.cs b/csharp/rocketmq-client-csharp/MessageIdGenerator.cs
index 8af1fda..8dc370d 100644
--- a/csharp/rocketmq-client-csharp/MessageIdGenerator.cs
+++ b/csharp/rocketmq-client-csharp/MessageIdGenerator.cs
@@ -20,7 +20,7 @@ using System.Diagnostics;
 using System.IO;
 using System.Threading;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     /**
      * MessageId generate rules refer: https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o
diff --git a/csharp/rocketmq-client-csharp/MessageType.cs b/csharp/rocketmq-client-csharp/MessageType.cs
index 376b658..a459e93 100644
--- a/csharp/rocketmq-client-csharp/MessageType.cs
+++ b/csharp/rocketmq-client-csharp/MessageType.cs
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public enum MessageType {
+    public enum MessageType
+    {
         Normal,
         Fifo,
         Delay,
         Transaction,
     }
-    
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MetadataConstants.cs b/csharp/rocketmq-client-csharp/MetadataConstants.cs
index 184bec8..5381595 100644
--- a/csharp/rocketmq-client-csharp/MetadataConstants.cs
+++ b/csharp/rocketmq-client-csharp/MetadataConstants.cs
@@ -17,8 +17,10 @@
 
 using System;
 
-namespace org.apache.rocketmq {
-    public class MetadataConstants {
+namespace Org.Apache.Rocketmq
+{
+    public class MetadataConstants
+    {
         public const string TENANT_ID_KEY = "x-mq-tenant-id";
         public const string NAMESPACE_KEY = "x-mq-namespace";
         public const string AUTHORIZATION = "authorization";
@@ -33,5 +35,7 @@ namespace org.apache.rocketmq {
         public const string CLIENT_VERSION_KEY = "x-mq-client-version";
         public const string PROTOCOL_VERSION_KEY = "x-mq-protocol-version";
         public const string REQUEST_ID_KEY = "x-mq-request-id";
+
+        public const string CLIENT_ID_KEY = "x-mq-client-id";
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MqLogManager.cs b/csharp/rocketmq-client-csharp/MqLogManager.cs
index 0608c8a..3d294bd 100644
--- a/csharp/rocketmq-client-csharp/MqLogManager.cs
+++ b/csharp/rocketmq-client-csharp/MqLogManager.cs
@@ -4,7 +4,7 @@ using System.Reflection;
 using NLog;
 using NLog.Config;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     /**
      * RocketMQ Log Manager.
diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/ProcessQueue.cs
similarity index 61%
copy from csharp/rocketmq-client-csharp/TopicRouteException.cs
copy to csharp/rocketmq-client-csharp/ProcessQueue.cs
index b520e72..1022978 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteException.cs
+++ b/csharp/rocketmq-client-csharp/ProcessQueue.cs
@@ -15,13 +15,28 @@
  * limitations under the License.
  */
 using System;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public class TopicRouteException : Exception
+    public class ProcessQueue
     {
-        public TopicRouteException(string message) : base(message)
+
+        public ProcessQueue()
         {
+            _lastReceivedTime = DateTime.UtcNow;
+        }
+        public bool Dropped { get; set; }
 
+        private DateTime _lastReceivedTime;
+
+        public DateTime LastReceiveTime
+        {
+            get { return _lastReceivedTime; }
+            set { _lastReceivedTime = value; }
+        }
+
+        internal bool Expired()
+        {
+            return DateTime.UtcNow.Subtract(_lastReceivedTime).TotalMilliseconds > 30 * 1000;
         }
 
     }
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 0b3f8a0..5c51cdc 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -14,94 +14,149 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
 using System.Threading.Tasks;
-using rmq = apache.rocketmq.v1;
-using pb = global::Google.Protobuf;
-using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
-
-
-namespace org.apache.rocketmq
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using NLog;
+using OpenTelemetry;
+using OpenTelemetry.Exporter;
+using OpenTelemetry.Metrics;
+
+namespace Org.Apache.Rocketmq
 {
     public class Producer : Client, IProducer
     {
-        public Producer(INameServerResolver resolver, string resourceNamespace) : base(resolver, resourceNamespace)
+        public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
         {
-            this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+            _loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+            _sendFailureTotal = MetricMeter.CreateCounter<long>("rocketmq_send_failure_total");
+            _sendLatency = MetricMeter.CreateHistogram<double>(SendLatencyName, 
+                description: "Measure the duration of publishing messages to brokers",
+                unit: "milliseconds");
         }
 
-        public override void start()
+        public override async Task Start()
         {
-            base.start();
-            // More initalization
+            await base.Start();
+            // More initialization
+            // TODO: Add authentication header
+
+            _meterProvider = Sdk.CreateMeterProviderBuilder()
+                .AddMeter("Apache.RocketMQ.Client")
+                .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
+                {
+                    options.Protocol = OtlpExportProtocol.Grpc;
+                    options.Endpoint = new Uri(_accessPoint.TargetUrl());
+                    options.TimeoutMilliseconds = (int) _clientSettings.RequestTimeout.ToTimeSpan().TotalMilliseconds;
+
+                    readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 60 * 1000;
+                })
+                .AddView((instrument) =>
+                {
+                    if (instrument.Meter.Name == MeterName && instrument.Name == SendLatencyName)
+                    {
+                        return new ExplicitBucketHistogramConfiguration()
+                        {
+                            Boundaries = new double[] {1, 5, 10, 20, 50, 200, 500},
+                        };
+                    }
+                    return null;
+                })
+                .Build();
         }
 
-        public override void shutdown()
+        public override async Task Shutdown()
         {
             // Release local resources
-            base.shutdown();
+            await base.Shutdown();
         }
 
-        public override void prepareHeartbeatData(rmq::HeartbeatRequest request)
+        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
+            request.ClientType = rmq::ClientType.Producer;
 
+            // Concept of ProducerGroup has been removed.
         }
 
-        public async Task<SendResult> send(Message message)
+        public async Task<SendReceipt> Send(Message message)
         {
-            if (!loadBalancer.ContainsKey(message.Topic))
+            if (!_loadBalancer.ContainsKey(message.Topic))
             {
-                var topicRouteData = await getRouteFor(message.Topic, false);
-                if (null == topicRouteData || null == topicRouteData.Partitions || 0 == topicRouteData.Partitions.Count)
+                var topicRouteData = await GetRouteFor(message.Topic, false);
+                if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
                 {
+                    Logger.Error($"Failed to resolve route info for {message.Topic}");
                     throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
                 }
 
                 var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
-                loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+                _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
             }
 
-            var publishLB = loadBalancer[message.Topic];
+            var publishLb = _loadBalancer[message.Topic];
 
             var request = new rmq::SendMessageRequest();
-            request.Message = new rmq::Message();
-            request.Message.Body = pb::ByteString.CopyFrom(message.Body);
-            request.Message.Topic = new rmq::Resource();
-            request.Message.Topic.ResourceNamespace = resourceNamespace();
-            request.Message.Topic.Name = message.Topic;
+            var entry = new rmq::Message();
+            entry.Body = ByteString.CopyFrom(message.Body);
+            entry.Topic = new rmq::Resource();
+            entry.Topic.ResourceNamespace = resourceNamespace();
+            entry.Topic.Name = message.Topic;
+            request.Messages.Add(entry);
 
             // User properties
             foreach (var item in message.UserProperties)
             {
-                request.Message.UserAttribute.Add(item.Key, item.Value);
+                entry.UserProperties.Add(item.Key, item.Value);
             }
 
-            request.Message.SystemAttribute = new rmq::SystemAttribute();
-            request.Message.SystemAttribute.MessageId = message.MessageId;
+            entry.SystemProperties = new rmq::SystemProperties();
+            entry.SystemProperties.MessageId = message.MessageId;
+            entry.SystemProperties.MessageType = rmq::MessageType.Normal;
+            if (DateTime.MinValue != message.DeliveryTimestamp)
+            {
+                entry.SystemProperties.MessageType = rmq::MessageType.Delay;
+                entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
+
+                if (message.Fifo())
+                {
+                    Logger.Warn("A message may not be FIFO and delayed at the same time");
+                    throw new MessageException("A message may not be both FIFO and Timed");
+                }
+            } else if (!String.IsNullOrEmpty(message.MessageGroup))
+            {
+                entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
+                entry.SystemProperties.MessageGroup = message.MessageGroup;
+            }
+            
             if (!string.IsNullOrEmpty(message.Tag))
             {
-                request.Message.SystemAttribute.Tag = message.Tag;
+                entry.SystemProperties.Tag = message.Tag;
             }
 
             if (0 != message.Keys.Count)
             {
                 foreach (var key in message.Keys)
                 {
-                    request.Message.SystemAttribute.Keys.Add(key);
+                    entry.SystemProperties.Keys.Add(key);
                 }
             }
 
-            // string target = "https://";
             List<string> targets = new List<string>();
-            List<Partition> candidates = publishLB.select(message.MaxAttemptTimes);
-            foreach (var partition in candidates)
+            List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
+            foreach (var messageQueue in candidates)
             {
-                targets.Add(partition.Broker.targetUrl());
+                targets.Add(Utilities.TargetUrl(messageQueue));
             }
 
-            var metadata = new grpc::Metadata();
+            var metadata = new Metadata();
             Signature.sign(this, metadata);
 
             Exception ex = null;
@@ -110,27 +165,46 @@ namespace org.apache.rocketmq
             {
                 try
                 {
-                    rmq::SendMessageResponse response = await clientManager.sendMessage(target, metadata, request, getIoTimeout());
-                    if (null != response && (int)global::Google.Rpc.Code.Ok == response.Common.Status.Code)
+                    var stopWatch = new Stopwatch();
+                    stopWatch.Start();
+                    rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
+                    if (null != response && rmq::Code.Ok == response.Status.Code)
                     {
-                        var messageId = response.MessageId;
-                        return new SendResult(messageId);
+                        var messageId = response.Entries[0].MessageId;
+                        
+                        // Account latency histogram
+                        stopWatch.Stop();
+                        var latency = stopWatch.ElapsedMilliseconds;
+                        _sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
+                        
+                        return new SendReceipt(messageId);
                     }
                 }
                 catch (Exception e)
                 {
+                    // Account failure count
+                    _sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));                    
+                    Logger.Info(e, $"Failed to send message to {target}");
                     ex = e;
                 }
             }
 
             if (null != ex)
             {
+                Logger.Error(ex, $"Failed to send message after {message.MaxAttemptTimes} attempts");
                 throw ex;
             }
 
+            Logger.Error($"Failed to send message after {message.MaxAttemptTimes} attempts with unspecified reasons");
             throw new Exception("Send message failed");
         }
 
-        private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
+        private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
+
+        private readonly Counter<long> _sendFailureTotal;
+        private readonly Histogram<double> _sendLatency;
+
+        private static readonly string SendLatencyName = "rocketmq_send_success_cost_time";
+        private MeterProvider _meterProvider;
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
new file mode 100644
index 0000000..7dbb702
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/admin.proto
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package apache.rocketmq.v2;
+
+option cc_enable_arenas = true;
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQAdmin";
+
+message ChangeLogLevelRequest {
+  enum Level {
+    TRACE = 0;
+    DEBUG = 1;
+    INFO = 2;
+    WARN = 3;
+    ERROR = 4;
+  }
+  Level level = 1;
+}
+
+message ChangeLogLevelResponse { string remark = 1; }
+
+service Admin {
+  rpc ChangeLogLevel(ChangeLogLevelRequest) returns (ChangeLogLevelResponse) {}
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto
new file mode 100644
index 0000000..21a6321
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/definition.proto
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/duration.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQDomain";
+
+enum TransactionResolution {
+  TRANSACTION_RESOLUTION_UNSPECIFIED = 0;
+  COMMIT = 1;
+  ROLLBACK = 2;
+}
+
+enum TransactionSource {
+  SOURCE_UNSPECIFIED = 0;
+  SOURCE_CLIENT = 1;
+  SOURCE_SERVER_CHECK = 2;
+}
+
+enum Permission {
+  PERMISSION_UNSPECIFIED = 0;
+  NONE = 1;
+  READ = 2;
+  WRITE = 3;
+  READ_WRITE = 4;
+}
+
+enum FilterType {
+  FILTER_TYPE_UNSPECIFIED = 0;
+  TAG = 1;
+  SQL = 2;
+}
+
+message FilterExpression {
+  FilterType type = 1;
+  string expression = 2;
+}
+
+message RetryPolicy {
+  int32 max_attempts = 1;
+  oneof strategy {
+    ExponentialBackoff exponential_backoff = 2;
+    CustomizedBackoff customized_backoff = 3;
+  }
+}
+
+// https://en.wikipedia.org/wiki/Exponential_backoff
+message ExponentialBackoff {
+  google.protobuf.Duration initial = 1;
+  google.protobuf.Duration max = 2;
+  float multiplier = 3;
+}
+
+message CustomizedBackoff {
+  // To support classic backoff strategy which is arbitary defined by end users.
+  // Typical values are: `1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h`
+  repeated google.protobuf.Duration next = 1;
+}
+
+message Resource {
+  string resource_namespace = 1;
+
+  // Resource name identifier, which remains unique within the abstract resource
+  // namespace.
+  string name = 2;
+}
+
+message SubscriptionEntry {
+  Resource topic = 1;
+  FilterExpression expression = 2;
+}
+
+enum AddressScheme {
+  ADDRESS_SCHEME_UNSPECIFIED = 0;
+  IPv4 = 1;
+  IPv6 = 2;
+  DOMAIN_NAME = 3;
+}
+
+message Address {
+  string host = 1;
+  int32 port = 2;
+}
+
+message Endpoints {
+  AddressScheme scheme = 1;
+  repeated Address addresses = 2;
+}
+
+message Broker {
+  // Name of the broker
+  string name = 1;
+
+  // Broker index. Canonically, index = 0 implies that the broker is playing
+  // leader role while brokers with index > 0 play follower role.
+  int32 id = 2;
+
+  // Address of the broker, complying with the following scheme
+  // 1. dns:[//authority/]host[:port]
+  // 2. ipv4:address[:port][,address[:port],...] – IPv4 addresses
+  // 3. ipv6:address[:port][,address[:port],...] – IPv6 addresses
+  Endpoints endpoints = 3;
+}
+
+message MessageQueue {
+  Resource topic = 1;
+  int32 id = 2;
+  Permission permission = 3;
+  Broker broker = 4;
+  repeated MessageType accept_message_types = 5;
+}
+
+enum MessageType {
+  MESSAGE_TYPE_UNSPECIFIED = 0;
+
+  NORMAL = 1;
+
+  // Sequenced message
+  FIFO = 2;
+
+  // Messages that are delivered after the specified duration.
+  DELAY = 3;
+
+  // Messages that are transactional. Only committed messages are delivered to
+  // subscribers.
+  TRANSACTION = 4;
+}
+
+enum DigestType {
+  DIGEST_TYPE_UNSPECIFIED = 0;
+
+  // CRC algorithm achieves goal of detecting random data error with lowest
+  // computation overhead.
+  CRC32 = 1;
+
+  // MD5 algorithm achieves good balance between collision rate and computation
+  // overhead.
+  MD5 = 2;
+
+  // SHA-family has substantially fewer collision with fair amount of
+  // computation.
+  SHA1 = 3;
+}
+
+// When publishing messages to or subscribing messages from brokers, clients
+// shall include or validate digests of message body to ensure data integrity.
+//
+// For message publishing, when an invalid digest were detected, brokers need
+// respond client with BAD_REQUEST.
+//
+// For messages subscription, when an invalid digest were detected, consumers
+// need to handle this case according to message type:
+// 1) Standard messages should be negatively acknowledged instantly, causing
+// immediate re-delivery; 2) FIFO messages require special RPC, to re-fetch
+// previously acquired messages batch;
+//
+// Message consumption model also affects how invalid digest are handled. When
+// messages are consumed in broadcasting way,
+// TODO: define semantics of invalid-digest-when-broadcasting.
+message Digest {
+  DigestType type = 1;
+  string checksum = 2;
+}
+
+enum ClientType {
+  CLIENT_TYPE_UNSPECIFIED = 0;
+  PRODUCER = 1;
+  PUSH_CONSUMER = 2;
+  SIMPLE_CONSUMER = 3;
+}
+
+enum Encoding {
+  ENCODING_UNSPECIFIED = 0;
+
+  IDENTITY = 1;
+
+  GZIP = 2;
+}
+
+message SystemProperties {
+  // Tag, which is optional.
+  optional string tag = 1;
+
+  // Message keys
+  repeated string keys = 2;
+
+  // Message identifier, client-side generated, remains unique.
+  // if message_id is empty, the send message request will be aborted with
+  // status `INVALID_ARGUMENT`
+  string message_id = 3;
+
+  // Message body digest
+  Digest body_digest = 4;
+
+  // Message body encoding. Candidate options are identity, gzip, snappy etc.
+  Encoding body_encoding = 5;
+
+  // Message type, normal, FIFO or transactional.
+  MessageType message_type = 6;
+
+  // Message born time-point.
+  google.protobuf.Timestamp born_timestamp = 7;
+
+  // Message born host. Valid options are IPv4, IPv6 or client host domain name.
+  string born_host = 8;
+
+  // Time-point at which the message is stored in the broker, which is absent
+  // for message publishing.
+  optional google.protobuf.Timestamp store_timestamp = 9;
+
+  // The broker that stores this message. It may be broker name, IP or arbitrary
+  // identifier that uniquely identify the server.
+  string store_host = 10;
+
+  // Time-point at which broker delivers to clients, which is optional.
+  optional google.protobuf.Timestamp delivery_timestamp = 11;
+
+  // If a message is acquired by way of POP, this field holds the receipt,
+  // which is absent for message publishing.
+  // Clients use the receipt to acknowledge or negatively acknowledge the
+  // message.
+  optional string receipt_handle = 12;
+
+  // Message queue identifier in which a message is physically stored.
+  int32 queue_id = 13;
+
+  // Message-queue offset at which a message is stored, which is absent for
+  // message publishing.
+  optional int64 queue_offset = 14;
+
+  // Period of time servers would remain invisible once a message is acquired.
+  optional google.protobuf.Duration invisible_duration = 15;
+
+  // Business code may failed to process messages for the moment. Hence, clients
+  // may request servers to deliver them again using certain back-off strategy,
+  // the attempt is 1 not 0 if message is delivered first time, and it is absent
+  // for message publishing.
+  optional int32 delivery_attempt = 16;
+
+  // Define the group name of message in the same topic, which is optional.
+  optional string message_group = 17;
+
+  // Trace context for each message, which is optional.
+  optional string trace_context = 18;
+
+  // If a transactional message stay unresolved for more than
+  // `transaction_orphan_threshold`, it would be regarded as an
+  // orphan. Servers that manages orphan messages would pick up
+  // a capable publisher to resolve
+  optional google.protobuf.Duration orphaned_transaction_recovery_duration = 19;
+}
+
+message Message {
+
+  Resource topic = 1;
+
+  // User defined key-value pairs.
+  // If user_properties contain the reserved keys by RocketMQ,
+  // the send message request will be aborted with status `INVALID_ARGUMENT`.
+  // See below links for the reserved keys
+  // https://github.com/apache/rocketmq/blob/master/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java#L58
+  map<string, string> user_properties = 2;
+
+  SystemProperties system_properties = 3;
+
+  bytes body = 4;
+}
+
+message Assignment { MessageQueue message_queue = 1; }
+
+enum Code {
+  // Success.
+  OK = 0;
+  // Format of access point is illegal.
+  ILLEGAL_ACCESS_POINT = 1;
+  // Format of topic is illegal.
+  ILLEGAL_TOPIC = 2;
+  // Format of consumer group is illegal.
+  ILLEGAL_CONSUMER_GROUP = 3;
+  // Format of message tag is illegal.
+  ILLEGAL_MESSAGE_TAG = 4;
+  // Format of message key is illegal.
+  ILLEGAL_MESSAGE_KEY = 5;
+  // Size of message keys exceeds the threshold.
+  MESSAGE_KEYS_TOO_LARGE = 6;
+  // Format of message group is illegal.
+  ILLEGAL_MESSAGE_GROUP = 7;
+  // Format of message property key is illegal.
+  ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
+  // Message properties total size exceeds the threshold.
+  MESSAGE_PROPERTIES_TOO_LARGE = 9;
+  // Message body size exceeds the threshold.
+  MESSAGE_BODY_TOO_LARGE = 10;
+
+  // User does not have the permission to operate.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
+  FORBIDDEN = 403;
+
+  // Code indicates that the client request has not been completed
+  // because it lacks valid authentication credentials for the
+  // requested resource.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
+  UNAUTHORIZED = 401;
+
+  // Topic resource does not exist.
+  TOPIC_NOT_FOUND = 13;
+
+  // Consumer group resource does not exist.
+  CONSUMER_GROUP_NOT_FOUND = 14;
+
+  // Not allowed to verify message. Chances are that you are verifying
+  // a FIFO message, as is violating FIFO semantics.
+  VERIFY_MESSAGE_FORBIDDEN = 15;
+
+  // Failed to consume message.
+  FAILED_TO_CONSUME_MESSAGE = 16;
+
+  // Message is corrupted.
+  MESSAGE_CORRUPTED = 17;
+
+  // Too many requests are made in short period of duration.
+  // Requests are throttled.
+  TOO_MANY_REQUESTS = 18;
+
+  // Expired receipt-handle is used when trying to acknowledge or change
+  // invisible duration of a message
+  RECEIPT_HANDLE_EXPIRED = 19;
+
+  // Message property is not match the message type.
+  MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
+
+  // Format of message id is illegal.
+  ILLEGAL_MESSAGE_ID = 21;
+
+  // Transaction id is invalid.
+  INVALID_TRANSACTION_ID = 22;
+
+  // Format of filter expression is illegal.
+  ILLEGAL_FILTER_EXPRESSION = 23;
+
+  // Receipt handle of message is invalid.
+  INVALID_RECEIPT_HANDLE = 24;
+
+  // Message persistence timeout.
+  MASTER_PERSISTENCE_TIMEOUT = 25;
+
+  // Slave persistence timeout.
+  SLAVE_PERSISTENCE_TIMEOUT = 26;
+
+  // The HA-mechanism is not working now.
+  HA_NOT_AVAILABLE = 27;
+
+  // Operation is not allowed in current version.
+  VERSION_UNSUPPORTED = 28;
+
+  // Message not found from server.
+  MESSAGE_NOT_FOUND = 29;
+
+  // Message offset is illegal.
+  ILLEGAL_MESSAGE_OFFSET = 30;
+
+  // Illegal message is for the sake of backward compatibility. In most case,
+  // more definitive code is better, e.g. `ILLEGAL_MESSAGE_TAG`.
+  ILLEGAL_MESSAGE = 31;
+
+  // Client type could not be recognized.
+  UNRECOGNIZED_CLIENT_TYPE = 32;
+
+  // Return different results for entries in composite request.
+  MULTIPLE_RESULTS = 33;
+
+  // Code indicates that the server encountered an unexpected condition
+  // that prevented it from fulfilling the request.
+  // This error response is a generic "catch-all" response.
+  // Usually, this indicates the server cannot find a better alternative
+  // error code to response. Sometimes, server administrators log error
+  // responses like the 500 status code with more details about the request
+  // to prevent the error from happening again in the future.
+  //
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
+  INTERNAL_SERVER_ERROR = 500;
+
+  // Code means that the server or client does not support the functionality
+  // required to fulfill the request.
+  NOT_IMPLEMENTED = 501;
+
+  // Code indicates that the server, while acting as a gateway or proxy,
+  // did not get a response in time from the upstream server that
+  // it needed in order to complete the request.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+  GATEWAY_TIMEOUT = 504;
+}
+
+message Status {
+  Code code = 1;
+  string message = 2;
+}
+
+enum Language {
+  LANGUAGE_UNSPECIFIED = 0;
+  JAVA = 1;
+  CPP = 2;
+  DOT_NET = 3;
+  GOLANG = 4;
+  RUST = 5;
+}
+
+// User Agent
+message UA {
+  // SDK language
+  Language language = 1;
+
+  // SDK version
+  string version = 2;
+
+  // Platform details, including OS name, version, arch etc.
+  string platform = 3;
+
+  // Hostname of the node
+  string hostname = 4;
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
new file mode 100644
index 0000000..c7ce2e9
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
@@ -0,0 +1,445 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+
+import "apache/rocketmq/v2/definition.proto";
+
+package apache.rocketmq.v2;
+
+option csharp_namespace = "Apache.Rocketmq.V2";
+option java_multiple_files = true;
+option java_package = "apache.rocketmq.v2";
+option java_generate_equals_and_hash = true;
+option java_string_check_utf8 = true;
+option java_outer_classname = "MQService";
+
+// Topics are destination of messages to publish to or subscribe from. Similar
+// to domain names, they will be addressable after resolution through the
+// provided access point.
+//
+// Access points are usually the addresses of name servers, which fulfill
+// service discovery, load-balancing and other auxiliary services. Name servers
+// receive periodic heartbeats from affiliate brokers and erase those which
+// failed to maintain alive status.
+//
+// Name servers answer queries of QueryRouteRequest, responding clients with
+// addressable message-queues, which they may directly publish messages to or
+// subscribe messages from.
+//
+// QueryRouteRequest shall include source endpoints, aka, configured
+// access-point, which annotates tenant-id, instance-id or other
+// vendor-specific settings. Purpose-built name servers may respond customized
+// results based on these particular requirements.
+message QueryRouteRequest {
+  Resource topic = 1;
+  Endpoints endpoints = 2;
+}
+
+message QueryRouteResponse {
+  Status status = 1;
+
+  repeated MessageQueue message_queues = 2;
+}
+
+message SendMessageRequest {
+  repeated Message messages = 1;
+}
+
+message SendResultEntry {
+  Status status = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  int64 offset = 4;
+}
+
+message SendMessageResponse {
+  Status status = 1;
+
+  // Some implementation may have partial failure issues. Client SDK developers are expected to inspect
+  // each entry for best certainty.
+  repeated SendResultEntry entries = 2;
+}
+
+message QueryAssignmentRequest {
+  Resource topic = 1;
+  Resource group = 2;
+  Endpoints endpoints = 3;
+}
+
+message QueryAssignmentResponse {
+  Status status = 1;
+  repeated Assignment assignments = 2;
+}
+
+message ReceiveMessageRequest {
+  Resource group = 1;
+  MessageQueue message_queue = 2;
+  FilterExpression filter_expression = 3;
+  int32 batch_size = 4;
+  // Required if client type is simple consumer.
+  optional google.protobuf.Duration invisible_duration = 5;
+  // For message auto renew and clean
+  bool auto_renew = 6;
+}
+
+message ReceiveMessageResponse {
+  oneof content {
+    Status status = 1;
+    Message message = 2;
+    // The timestamp that brokers start to deliver status line or message.
+    google.protobuf.Timestamp delivery_timestamp = 3;
+  }
+}
+
+message AckMessageEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+}
+
+message AckMessageRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  repeated AckMessageEntry entries = 3;
+}
+
+message AckMessageResultEntry {
+  string message_id = 1;
+  string receipt_handle = 2;
+
+  // Acknowledge result may be acquired through inspecting
+  // `status.code`; In case acknowledgement failed, `status.message`
+  // is the explanation of the failure.
+  Status status = 3;
+}
+
+message AckMessageResponse {
+
+  // RPC tier status, which is used to represent RPC-level errors including
+  // authentication, authorization, throttling and other general failures.
+  Status status = 1;
+
+  repeated AckMessageResultEntry entries = 2;
+}
+
+message ForwardMessageToDeadLetterQueueRequest {
+  Resource group = 1;
+  Resource topic = 2;
+  string receipt_handle = 3;
+  string message_id = 4;
+  int32 delivery_attempt = 5;
+  int32 max_delivery_attempts = 6;
+}
+
+message ForwardMessageToDeadLetterQueueResponse { Status status = 1; }
+
+message HeartbeatRequest {
+  optional Resource group = 1;
+  ClientType client_type = 2;
+}
+
+message HeartbeatResponse { Status status = 1; }
+
+message EndTransactionRequest {
+  Resource topic = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  TransactionResolution resolution = 4;
+  TransactionSource source = 5;
+  string trace_context = 6;
+}
+
+message EndTransactionResponse { Status status = 1; }
+
+message PrintThreadStackTraceCommand { string nonce = 1; }
+
+message ThreadStackTrace {
+  string nonce = 1;
+  optional string thread_stack_trace = 2;
+}
+
+message VerifyMessageCommand {
+  string nonce = 1;
+  MessageQueue message_queue = 2;
+  Message message = 3;
+}
+
+message VerifyMessageResult {
+  string nonce = 1;
+}
+
+message RecoverOrphanedTransactionCommand {
+  MessageQueue message_queue = 1;
+  Message orphaned_transactional_message = 2;
+  string transaction_id = 3;
+}
+
+message Publishing {
+  // Publishing settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // List of topics to which messages will publish to.
+  repeated Resource topics = 1;
+
+  // Publishing settings below here are from server, it is essential for
+  // server to push.
+  //
+  // Body of message will be deflated if its size in bytes exceeds the
+  // threshold.
+  int32 compress_body_threshold = 2;
+
+  // If the message body size exceeds `max_body_size`, broker servers would
+  // reject the request. As a result, it is advisable that Producer performs
+  // client-side check validation.
+  int32 max_body_size = 3;
+}
+
+message Subscription {
+  // Subscription settings below here is appointed by client, thus it is
+  // unnecessary for server to push at present.
+  //
+  // Consumer group.
+  optional Resource group = 1;
+
+  // Subscription for consumer.
+  repeated SubscriptionEntry subscriptions = 2;
+
+  // Subscription settings below here are from server, it is essential for
+  // server to push.
+  //
+  // When FIFO flag is `true`, messages of the same message group are processed
+  // in first-in-first-out manner.
+  //
+  // Brokers will not deliver further messages of the same group utill prior
+  // ones are completely acknowledged.
+  optional bool fifo = 3;
+
+  // Message receive batch size here is essential for push consumer.
+  optional int32 receive_batch_size = 4;
+
+  // Long-polling timeout for `ReceiveMessageRequest`, which is essential for
+  // push consumer.
+  optional google.protobuf.Duration long_polling_timeout = 5;
+}
+
+message Metric {
+  // Indicates that if client should export local metrics to server.
+  bool on = 1;
+  
+  // The endpoint that client metrics should be exported to, which is required if the switch is on.
+  optional Endpoints endpoints = 2;
+}
+
+message Settings {
+  // Configurations for all clients.
+  optional ClientType client_type = 1;
+
+  optional Endpoints access_point = 2;
+
+  // If publishing of messages encounters throttling or server internal errors,
+  // publishers should implement automatic retries after progressive longer
+  // back-offs for consecutive errors.
+  //
+  // When processing message fails, `backoff_policy` describes an interval
+  // after which the message should be available to consume again.
+  //
+  // For FIFO messages, the interval should be relatively small because
+  // messages of the same message group would not be readily available utill
+  // the prior one depletes its lifecycle.
+  optional RetryPolicy backoff_policy = 3;
+
+  // Request timeout for RPCs excluding long-polling.
+  optional google.protobuf.Duration request_timeout = 4;
+
+  oneof pub_sub {
+    Publishing publishing = 5;
+
+    Subscription subscription = 6;
+  }
+
+  // User agent details
+  UA user_agent = 7;
+
+  Metric metric = 8;
+}
+
+message TelemetryCommand {
+  optional Status status = 1;
+
+  oneof command {
+    // Client settings
+    Settings settings = 2;
+
+    // These messages are from client.
+    //
+    // Report thread stack trace to server.
+    ThreadStackTrace thread_stack_trace = 3;
+
+    // Report message verify result to server.
+    VerifyMessageResult verify_message_result = 4;
+
+    // There messages are from server.
+    //
+    // Request client to recover the orphaned transaction message.
+    RecoverOrphanedTransactionCommand recover_orphaned_transaction_command = 5;
+
+    // Request client to print thread stack trace.
+    PrintThreadStackTraceCommand print_thread_stack_trace_command = 6;
+
+    // Request client to verify the consumption of the appointed message.
+    VerifyMessageCommand verify_message_command = 7;
+  }
+}
+
+message NotifyClientTerminationRequest {
+  // Consumer group, which is absent for producer.
+  optional Resource group = 1;
+}
+
+message NotifyClientTerminationResponse { Status status = 1; }
+
+message ChangeInvisibleDurationRequest {
+  Resource group = 1;
+  Resource topic = 2;
+
+  // Unique receipt handle to identify message to change
+  string receipt_handle = 3;
+
+  // New invisible duration
+  google.protobuf.Duration invisible_duration = 4;
+
+  // For message tracing
+  string message_id = 5;
+}
+
+message ChangeInvisibleDurationResponse {
+  Status status = 1;
+
+  // Server may generate a new receipt handle for the message.
+  string receipt_handle = 2;
+}
+
+// For all the RPCs in MessagingService, the following error handling policies
+// apply:
+//
+// If the request doesn't bear a valid authentication credential, return a
+// response with common.status.code == `UNAUTHENTICATED`. If the authenticated
+// user is not granted with sufficient permission to execute the requested
+// operation, return a response with common.status.code == `PERMISSION_DENIED`.
+// If the per-user-resource-based quota is exhausted, return a response with
+// common.status.code == `RESOURCE_EXHAUSTED`. If any unexpected server-side
+// errors raise, return a response with common.status.code == `INTERNAL`.
+service MessagingService {
+
+  // Queries the route entries of the requested topic in the perspective of the
+  // given endpoints. On success, servers should return a collection of
+  // addressable message-queues. Note servers may return customized route
+  // entries based on endpoints provided.
+  //
+  // If the requested topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryRoute(QueryRouteRequest) returns (QueryRouteResponse) {}
+
+  // Producer or consumer sends HeartbeatRequest to servers periodically to
+  // keep-alive. Additionally, it also reports client-side configuration,
+  // including topic subscription, load-balancing group name, etc.
+  //
+  // Returns `OK` if success.
+  //
+  // If a client specifies a language that is not yet supported by servers,
+  // returns `INVALID_ARGUMENT`
+  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {}
+
+  // Delivers messages to brokers.
+  // Clients may further:
+  // 1. Refine a message destination to message-queues which fulfills parts of
+  // FIFO semantic;
+  // 2. Flag a message as transactional, which keeps it invisible to consumers
+  // until it commits;
+  // 3. Time a message, making it invisible to consumers till specified
+  // time-point;
+  // 4. And more...
+  //
+  // Returns message-id or transaction-id with status `OK` on success.
+  //
+  // If the destination topic doesn't exist, returns `NOT_FOUND`.
+  rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
+
+  // Queries the assigned route info of a topic for current consumer,
+  // the returned assignment result is decided by server-side load balancer.
+  //
+  // If the corresponding topic doesn't exist, returns `NOT_FOUND`.
+  // If the specific endpoints is empty, returns `INVALID_ARGUMENT`.
+  rpc QueryAssignment(QueryAssignmentRequest) returns (QueryAssignmentResponse) {
+  }
+
+  // Receives messages from the server in batch manner, returns a set of
+  // messages if success. The received messages should be acked or redelivered
+  // after processed.
+  //
+  // If the pending concurrent receive requests exceed the quota of the given
+  // consumer group, returns `UNAVAILABLE`. If the upstream store server hangs,
+  // return `DEADLINE_EXCEEDED` in a timely manner. If the corresponding topic
+  // or consumer group doesn't exist, returns `NOT_FOUND`. If there is no new
+  // message in the specific topic, returns `OK` with an empty message set.
+  // Please note that client may suffer from false empty responses.
+  //
+  // If failed to receive message from remote, server must return only one
+  // `ReceiveMessageResponse` as the reply to the request, whose `Status` indicates
+  // the specific reason of failure, otherwise, the reply is considered successful.
+  rpc ReceiveMessage(ReceiveMessageRequest) returns (stream ReceiveMessageResponse) {
+  }
+
+  // Acknowledges the message associated with the `receipt_handle` or `offset`
+  // in the `AckMessageRequest`, it means the message has been successfully
+  // processed. Returns `OK` if the message server remove the relevant message
+  // successfully.
+  //
+  // If the given receipt_handle is illegal or out of date, returns
+  // `INVALID_ARGUMENT`.
+  rpc AckMessage(AckMessageRequest) returns (AckMessageResponse) {}
+
+  // Forwards one message to dead letter queue if the max delivery attempts is
+  // exceeded by this message at client-side, return `OK` if success.
+  rpc ForwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest)
+      returns (ForwardMessageToDeadLetterQueueResponse) {}
+
+  // Commits or rollback one transactional message.
+  rpc EndTransaction(EndTransactionRequest) returns (EndTransactionResponse) {}
+
+  // Once a client starts, it would immediately establishes bi-lateral stream
+  // RPCs with brokers, reporting its settings as the initiative command.
+  //
+  // When servers have need of inspecting client status, they would issue
+  // telemetry commands to clients. After executing received instructions,
+  // clients shall report command execution results through client-side streams.
+  rpc Telemetry(stream TelemetryCommand) returns (stream TelemetryCommand) {}
+
+  // Notify the server that the client is terminated.
+  rpc NotifyClientTermination(NotifyClientTerminationRequest) returns (NotifyClientTerminationResponse) {
+  }
+
+  // Once a message is retrieved from consume queue on behalf of the group, it
+  // will be kept invisible to other clients of the same group for a period of
+  // time. The message is supposed to be processed within the invisible
+  // duration. If the client, which is in charge of the invisible message, is
+  // not capable of processing the message timely, it may use
+  // ChangeInvisibleDuration to lengthen invisible duration.
+  rpc ChangeInvisibleDuration(ChangeInvisibleDurationRequest) returns (ChangeInvisibleDurationResponse) {
+  }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
index 9a1b66d..7d258b4 100644
--- a/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -16,58 +16,59 @@
  */
 using System;
 using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public class PublishLoadBalancer
     {
         public PublishLoadBalancer(TopicRouteData route)
         {
-            this.partitions = new List<Partition>();
-            foreach (var partition in route.Partitions)
+            this._messageQueues = new List<rmq::MessageQueue>();
+            foreach (var messageQueue in route.MessageQueues)
             {
-                if (Permission.NONE == partition.Permission)
+                if (rmq::Permission.Unspecified == messageQueue.Permission)
                 {
                     continue;
                 }
 
-                if (Permission.READ == partition.Permission)
+                if (rmq::Permission.Read == messageQueue.Permission)
                 {
                     continue;
                 }
 
-                this.partitions.Add(partition);
+                this._messageQueues.Add(messageQueue);
             }
 
-            this.partitions.Sort();
+            this._messageQueues.Sort(Utilities.CompareMessageQueue);
             Random random = new Random();
-            this.roundRobinIndex = random.Next(0, this.partitions.Count);
+            this._roundRobinIndex = random.Next(0, this._messageQueues.Count);
         }
 
-        public void update(TopicRouteData route)
+        public void Update(TopicRouteData route)
         {
-            List<Partition> partitions = new List<Partition>();
-            foreach (var partition in route.Partitions)
+            List<rmq::MessageQueue> partitions = new List<rmq::MessageQueue>();
+            foreach (var partition in route.MessageQueues)
             {
-                if (Permission.NONE == partition.Permission)
+                if (rmq::Permission.Unspecified == partition.Permission)
                 {
                     continue;
                 }
 
-                if (Permission.READ == partition.Permission)
+                if (rmq::Permission.Read == partition.Permission)
                 {
                     continue;
                 }
                 partitions.Add(partition);
             }
             partitions.Sort();
-            this.partitions = partitions;
+            this._messageQueues = partitions;
         }
 
         /**
          * Accept a partition iff its broker is different.
          */
-        private bool accept(List<Partition> existing, Partition partition)
+        private bool Accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
         {
             if (0 == existing.Count)
             {
@@ -76,7 +77,7 @@ namespace org.apache.rocketmq
 
             foreach (var item in existing)
             {
-                if (item.Broker.Equals(partition.Broker))
+                if (item.Broker.Equals(messageQueue.Broker))
                 {
                     return false;
                 }
@@ -84,22 +85,22 @@ namespace org.apache.rocketmq
             return true;
         }
 
-        public List<Partition> select(int maxAttemptTimes)
+        public List<rmq::MessageQueue> Select(int maxAttemptTimes)
         {
-            List<Partition> result = new List<Partition>();
+            List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
 
-            List<Partition> all = this.partitions;
+            List<rmq::MessageQueue> all = this._messageQueues;
             if (0 == all.Count)
             {
                 return result;
             }
-            int start = ++roundRobinIndex;
+            int start = ++_roundRobinIndex;
             int found = 0;
 
             for (int i = 0; i < all.Count; i++)
             {
                 int idx = ((start + i) & int.MaxValue) % all.Count;
-                if (accept(result, all[idx]))
+                if (Accept(result, all[idx]))
                 {
                     result.Add(all[idx]);
                     if (++found >= maxAttemptTimes)
@@ -112,8 +113,8 @@ namespace org.apache.rocketmq
             return result;
         }
 
-        private List<Partition> partitions;
+        private List<rmq::MessageQueue> _messageQueues;
 
-        private int roundRobinIndex;
+        private int _roundRobinIndex;
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/Publishing.cs
similarity index 72%
copy from csharp/rocketmq-client-csharp/IClient.cs
copy to csharp/rocketmq-client-csharp/Publishing.cs
index 7f3ed64..ffedd17 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/Publishing.cs
@@ -15,18 +15,17 @@
  * limitations under the License.
  */
 
-using System.Threading.Tasks;
+using rmq = Apache.Rocketmq.V2;
+using System.Collections.Generic;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
-    public interface IClient : IClientConfig
+    // Settings for publishing
+    public class Publishing
     {
+        public List<rmq::Resource> Topics { get; set; }
+        public int CompressBodyThreshold { get; set; }
 
-        void heartbeat();
-
-        void healthCheck();
-
-        Task<bool> notifyClientTermination();
-
+        public int MaxBodySize { get; set; }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs b/csharp/rocketmq-client-csharp/PushConsumer.cs
new file mode 100644
index 0000000..cc30943
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/PushConsumer.cs
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+    public class PushConsumer : Client, IConsumer
+    {
+        public PushConsumer(AccessPoint accessPoint, string resourceNamespace, string group) : base(accessPoint, resourceNamespace)
+        {
+            _group = group;
+            _topicFilterExpressionMap = new ConcurrentDictionary<string, FilterExpression>();
+            _topicAssignmentsMap = new ConcurrentDictionary<string, List<rmq::Assignment>>();
+            _processQueueMap = new ConcurrentDictionary<rmq::Assignment, ProcessQueue>();
+            _scanAssignmentCTS = new CancellationTokenSource();
+            _scanExpiredProcessQueueCTS = new CancellationTokenSource();
+        }
+
+        public override async Task Start()
+        {
+            if (null == _messageListener)
+            {
+                throw new System.Exception("Bad configuration: message listener is required");
+            }
+
+            await base.Start();
+
+            // Step-1: Resolve topic routes
+            List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
+            foreach (var item in _topicFilterExpressionMap)
+            {
+                queryRouteTasks.Add(GetRouteFor(item.Key, true));
+            }
+            Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
+
+            // Step-2: Send heartbeats to all involving brokers so that we may get immediate, valid assignments.
+            await Heartbeat();
+
+            // Step-3: Scan load assignments that are assigned to current client
+            schedule(async () =>
+            {
+                await scanLoadAssignments();
+            }, 10, _scanAssignmentCTS.Token);
+
+            schedule(() =>
+            {
+                ScanExpiredProcessQueue();
+            }, 10, _scanExpiredProcessQueueCTS.Token);
+        }
+
+        public override async Task Shutdown()
+        {
+            _scanAssignmentCTS.Cancel();
+            _scanExpiredProcessQueueCTS.Cancel();
+
+            // Shutdown resources of derived class
+            await base.Shutdown();
+        }
+
+        private async Task scanLoadAssignments()
+        {
+            Logger.Debug("Start to scan load assignments from server");
+            List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq::Assignment>>>();
+            foreach (var item in _topicFilterExpressionMap)
+            {
+                tasks.Add(scanLoadAssignment(item.Key, _group));
+            }
+            var result = await Task.WhenAll(tasks);
+
+            foreach (var assignments in result)
+            {
+                if (assignments.Count == 0)
+                {
+                    continue;
+                }
+
+                checkAndUpdateAssignments(assignments);
+            }
+            Logger.Debug("Completed scanning load assignments");
+        }
+
+        private void ScanExpiredProcessQueue()
+        {
+            foreach (var item in _processQueueMap)
+            {
+                if (item.Value.Expired())
+                {
+                    Task.Run(async () =>
+                    {
+                        await ExecutePop0(item.Key);
+                    });
+                }
+            }
+        }
+
+        private void checkAndUpdateAssignments(List<rmq::Assignment> assignments)
+        {
+            if (assignments.Count == 0)
+            {
+                return;
+            }
+
+            string topic = assignments[0].MessageQueue.Topic.Name;
+
+            // Compare to generate or cancel pop-cycles
+            List<rmq::Assignment> existing;
+            _topicAssignmentsMap.TryGetValue(topic, out existing);
+
+            foreach (var assignment in assignments)
+            {
+                if (null == existing || !existing.Contains(assignment))
+                {
+                    ExecutePop(assignment);
+                }
+            }
+
+            if (null != existing)
+            {
+                foreach (var assignment in existing)
+                {
+                    if (!assignments.Contains(assignment))
+                    {
+                        Logger.Info($"Stop receiving messages from {assignment.MessageQueue.ToString()}");
+                        CancelPop(assignment);
+                    }
+                }
+            }
+
+        }
+
+        private void ExecutePop(rmq::Assignment assignment)
+        {
+            var processQueue = new ProcessQueue();
+            if (_processQueueMap.TryAdd(assignment, processQueue))
+            {
+                Task.Run(async () =>
+                {
+                    await ExecutePop0(assignment);
+                });
+            }
+        }
+
+        private async Task ExecutePop0(rmq::Assignment assignment)
+        {
+            Logger.Info($"Start to pop {assignment.MessageQueue.ToString()}");
+            while (true)
+            {
+                try
+                {
+                    ProcessQueue processQueue;
+                    if (!_processQueueMap.TryGetValue(assignment, out processQueue))
+                    {
+                        break;
+                    }
+
+                    if (processQueue.Dropped)
+                    {
+                        break;
+                    }
+
+                    List<Message> messages = await base.ReceiveMessage(assignment, _group);
+                    processQueue.LastReceiveTime = System.DateTime.UtcNow;
+
+                    // TODO: cache message and dispatch them 
+
+                    List<Message> failed = new List<Message>();
+                    await _messageListener.Consume(messages, failed);
+
+                    foreach (var message in failed)
+                    {
+                        await base.ChangeInvisibleDuration(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+                    }
+
+                    foreach (var message in messages)
+                    {
+                        if (!failed.Contains(message))
+                        {
+                            bool success = await base.Ack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+                            if (!success)
+                            {
+                                //TODO: log error.
+                            }
+                        }
+                    }
+                }
+                catch (System.Exception)
+                {
+                    // TODO: log exception raised.
+                }
+
+
+            }
+        }
+
+        private void CancelPop(rmq::Assignment assignment)
+        {
+            if (!_processQueueMap.ContainsKey(assignment))
+            {
+                return;
+            }
+
+            ProcessQueue processQueue;
+            if (_processQueueMap.Remove(assignment, out processQueue))
+            {
+                processQueue.Dropped = true;
+            }
+        }
+
+        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+        {
+        }
+
+        public void Subscribe(string topic, string expression, ExpressionType type)
+        {
+            var filterExpression = new FilterExpression(expression, type);
+            _topicFilterExpressionMap[topic] = filterExpression;
+
+        }
+
+        public void RegisterListener(IMessageListener listener)
+        {
+            if (null != listener)
+            {
+                _messageListener = listener;
+            }
+        }
+
+        private string _group;
+
+        private ConcurrentDictionary<string, FilterExpression> _topicFilterExpressionMap;
+        private IMessageListener _messageListener;
+
+        private CancellationTokenSource _scanAssignmentCTS;
+
+        private ConcurrentDictionary<string, List<rmq::Assignment>> _topicAssignmentsMap;
+
+        private ConcurrentDictionary<rmq::Assignment, ProcessQueue> _processQueueMap;
+
+        private CancellationTokenSource _scanExpiredProcessQueueCTS;
+
+    }
+
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs
index 0191e91..c1f1cd6 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -15,47 +15,176 @@
  * limitations under the License.
  */
 
+using System;
+using System.Collections.Generic;
+using System.Net.Http;
+using System.Net.Security;
+using System.Threading;
 using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+using Grpc.Net.Client;
+using NLog;
 
-namespace org.apache.rocketmq {
-    public class RpcClient : IRpcClient {
-        public RpcClient(MessagingService.MessagingServiceClient client) {
-            stub = client;
+namespace Org.Apache.Rocketmq
+{
+    public class RpcClient : IRpcClient
+    {
+        protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+        private readonly rmq::MessagingService.MessagingServiceClient _stub;
+        private readonly GrpcChannel _channel;
+
+        public RpcClient(string target)
+        {
+            _channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+            {
+                HttpHandler = CreateHttpHandler()
+            });
+            var invoker = _channel.Intercept(new ClientLoggerInterceptor());
+            _stub = new rmq::MessagingService.MessagingServiceClient(invoker);
+        }
+
+        public async Task Shutdown()
+        {
+            if (null != _channel)
+            {
+                await _channel.ShutdownAsync();
+            }
+        }
+
+        /**
+         * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
+         * why parameters are configured this way.
+         */
+        private HttpMessageHandler CreateHttpHandler()
+        {
+            var sslOptions = new SslClientAuthenticationOptions();
+            // Disable server certificate validation during development phase.
+            // Comment out the following line if server certificate validation is required. 
+            sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
+            var handler = new SocketsHttpHandler
+            {
+                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
+                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
+                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
+                EnableMultipleHttp2Connections = true,
+                SslOptions = sslOptions,
+            };
+            return handler;
+        }
+
+        public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
+        {
+            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+            var callOptions = new CallOptions(metadata, deadline);
+            return _stub.Telemetry(callOptions);
+        }
+
+        public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.QueryRouteAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+
+        public async Task<rmq::HeartbeatResponse> Heartbeat(Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.HeartbeatAsync(request, callOptions);
+            return await call.ResponseAsync;
         }
 
-        public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions)
+        public async Task<rmq::SendMessageResponse> SendMessage(Metadata metadata, rmq::SendMessageRequest request,
+            TimeSpan timeout)
         {
-            var call = stub.QueryRouteAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            var status = call.GetStatus();
-            if (status.StatusCode != grpc.StatusCode.OK) {
-                //TODO: Something is wrong, raise an exception here.
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.SendMessageAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Metadata metadata, rmq::QueryAssignmentRequest request,
+            TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.QueryAssignmentAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, 
+            rmq::ReceiveMessageRequest request, TimeSpan timeout) {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+            var call = _stub.ReceiveMessage(request, callOptions);
+            var result = new List<rmq::ReceiveMessageResponse>();
+            var stream = call.ResponseStream;
+            while (await stream.MoveNext())
+            {
+                var entry = stream.Current;
+                Logger.Debug($"Got ReceiveMessageResponse {entry}");
+                result.Add(entry);
             }
-            return response;
+            Logger.Debug($"Receiving of messages completed");
+            return result;
         }
 
-        public async Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions)
+        public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request,
+            TimeSpan timeout)
         {
-            var call = stub.HeartbeatAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            return response;
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.AckMessageAsync(request, callOptions);
+            return await call.ResponseAsync;
         }
 
-        public async Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions)
+        public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata, rmq::ChangeInvisibleDurationRequest request,
+            TimeSpan timeout)
         {
-            var call = stub.NotifyClientTerminationAsync(request, callOptions);
-            var response = await call.ResponseAsync;
-            return response;
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.ChangeInvisibleDurationAsync(request, callOptions);
+            return await call.ResponseAsync;
         }
 
-        public async Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions)
+        public async Task<rmq::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(Metadata metadata,
+            rmq::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
         {
-            var call = stub.SendMessageAsync(request, callOptions);
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.ForwardMessageToDeadLetterQueueAsync(request, callOptions);
             return await call.ResponseAsync;
         }
 
-        private MessagingService.MessagingServiceClient stub;
+        public async Task<rmq::EndTransactionResponse> EndTransaction(Metadata metadata, rmq::EndTransactionRequest request,
+            TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.EndTransactionAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
+
+        public async Task<rmq::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
+            rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+        {
+            var deadline = DateTime.UtcNow.Add(timeout);
+            var callOptions = new CallOptions(metadata, deadline);
+
+            var call = _stub.NotifyClientTerminationAsync(request, callOptions);
+            return await call.ResponseAsync;
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs
similarity index 58%
copy from csharp/rocketmq-client-csharp/TopicRouteException.cs
copy to csharp/rocketmq-client-csharp/SendReceipt.cs
index b520e72..0f29991 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteException.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -14,15 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-namespace org.apache.rocketmq
+
+namespace Org.Apache.Rocketmq
 {
-    public class TopicRouteException : Exception
+    public sealed class SendReceipt
     {
-        public TopicRouteException(string message) : base(message)
+        public SendReceipt(string messageId)
         {
+            status_ = SendStatus.SEND_OK;
+            messageId_ = messageId;
+        }
 
+        public SendReceipt(string messageId, SendStatus status)
+        {
+            status_ = status;
+            messageId_ = messageId;
         }
 
+        private string messageId_;
+
+        public string MessageId
+        {
+            get { return messageId_; }
+        }
+
+
+        private SendStatus status_;
+
+        public SendStatus Status
+        {
+            get { return status_; }
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendStatus.cs b/csharp/rocketmq-client-csharp/SendStatus.cs
index 8964211..7586d22 100644
--- a/csharp/rocketmq-client-csharp/SendStatus.cs
+++ b/csharp/rocketmq-client-csharp/SendStatus.cs
@@ -15,8 +15,10 @@
  * limitations under the License.
  */
 
-namespace org.apache.rocketmq {
-    public enum SendStatus {
+namespace Org.Apache.Rocketmq
+{
+    public enum SendStatus
+    {
         SEND_OK,
         FLUSH_DISK_TIMEOUT,
         FLUSH_SLAVE_TIMEOUT,
diff --git a/csharp/rocketmq-client-csharp/SequenceGenerator.cs b/csharp/rocketmq-client-csharp/SequenceGenerator.cs
index aa92c80..97a1eb9 100644
--- a/csharp/rocketmq-client-csharp/SequenceGenerator.cs
+++ b/csharp/rocketmq-client-csharp/SequenceGenerator.cs
@@ -17,8 +17,9 @@
 using System;
 using System.Threading;
 using System.Net.NetworkInformation;
+using NLog;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     /**
      * See https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o for Sequence ID spec.
@@ -27,6 +28,7 @@ namespace org.apache.rocketmq
      */
     public sealed class SequenceGenerator
     {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
         public static SequenceGenerator Instance
         {
@@ -94,10 +96,11 @@ namespace org.apache.rocketmq
             {
                 if (nic.OperationalStatus == OperationalStatus.Up)
                 {
-                    if (nic.Name.Equals("lo"))
+                    if (nic.Name.StartsWith("lo"))
                     {
                         continue;
                     }
+                    Logger.Debug($"NIC={nic.Name}");
                     return nic.GetPhysicalAddress().GetAddressBytes();
                 }
             }
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
new file mode 100644
index 0000000..a6be057
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using grpc = global::Grpc.Core;
+using NLog;
+using rmq = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public class Session
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+        public Session(string target,
+            grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+            IClient client)
+        {
+            this._target = target;
+            this._stream = stream;
+            this._client = client;
+            this._channel = Channel.CreateUnbounded<bool>();
+        }
+
+        public async Task Loop()
+        {
+            var reader = this._stream.ResponseStream;
+            var writer = this._stream.RequestStream;
+            var request = new rmq::TelemetryCommand();
+            request.Settings = new rmq::Settings();
+            _client.BuildClientSetting(request.Settings);
+            await writer.WriteAsync(request);
+            Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
+            while (!_client.TelemetryCts().IsCancellationRequested)
+            {
+                if (await reader.MoveNext(_client.TelemetryCts().Token))
+                {
+                    var cmd = reader.Current;
+                    Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
+                    switch (cmd.CommandCase)
+                    {
+                        case rmq::TelemetryCommand.CommandOneofCase.None:
+                            {
+                                Logger.Warn($"Telemetry failed: {cmd.Status}");
+                                if (0 == Interlocked.CompareExchange(ref _established, 0, 2))
+                                {
+                                    await _channel.Writer.WriteAsync(false);
+                                }
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.Settings:
+                            {
+                                if (0 == Interlocked.CompareExchange(ref _established, 0, 1))
+                                {
+                                    await _channel.Writer.WriteAsync(true);
+                                }
+
+                                Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+                                _client.OnSettingsReceived(cmd.Settings);
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
+                            {
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
+                            {
+                                break;
+                            }
+                        case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand:
+                            {
+                                break;
+                            }
+                    }
+                }
+            }
+            Logger.Info("Telemetry stream cancelled");
+            await writer.CompleteAsync();
+        }
+
+        private string _target;
+
+        public string Target
+        {
+            get { return _target; }
+        }
+
+        public async Task AwaitSettingNegotiationCompletion()
+        {
+            if (0 != Interlocked.Read(ref _established))
+            {
+                return;
+            }
+
+            Logger.Debug("Await setting negotiation");
+            await _channel.Reader.ReadAsync();
+        }
+
+        private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
+        private IClient _client;
+
+        private long _established = 0;
+
+        private Channel<bool> _channel;
+    };
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index 70e038a..2331b53 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -19,29 +19,38 @@ using System.Text;
 using grpc = global::Grpc.Core;
 using System.Security.Cryptography;
 
-namespace org.apache.rocketmq {
-    public class Signature {
-        public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) {
+namespace Org.Apache.Rocketmq
+{
+    public class Signature
+    {
+        public static void sign(IClientConfig clientConfig, grpc::Metadata metadata)
+        {
             metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
             metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
-            if (!String.IsNullOrEmpty(clientConfig.tenantId())) {
+            metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId());
+            if (!String.IsNullOrEmpty(clientConfig.tenantId()))
+            {
                 metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
             }
 
-            if (!String.IsNullOrEmpty(clientConfig.resourceNamespace())) {
+            if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
+            {
                 metadata.Add(MetadataConstants.NAMESPACE_KEY, clientConfig.resourceNamespace());
             }
 
             string time = DateTime.Now.ToString(MetadataConstants.DATE_TIME_FORMAT);
             metadata.Add(MetadataConstants.DATE_TIME_KEY, time);
 
-            if (null != clientConfig.credentialsProvider()) {
+            if (null != clientConfig.credentialsProvider())
+            {
                 var credentials = clientConfig.credentialsProvider().getCredentials();
-                if (null == credentials || credentials.expired()) {
+                if (null == credentials || credentials.expired())
+                {
                     return;
                 }
 
-                if (!String.IsNullOrEmpty(credentials.SessionToken)) {
+                if (!String.IsNullOrEmpty(credentials.SessionToken))
+                {
                     metadata.Add(MetadataConstants.STS_SESSION_TOKEN, credentials.SessionToken);
                 }
 
@@ -50,7 +59,7 @@ namespace org.apache.rocketmq {
                 HMACSHA1 signer = new HMACSHA1(secretData);
                 byte[] digest = signer.ComputeHash(data);
                 string hmac = BitConverter.ToString(digest).Replace("-", "");
-                string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}", 
+                string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}",
                     MetadataConstants.ALGORITHM_KEY,
                     MetadataConstants.CREDENTIAL_KEY,
                     credentials.AccessKey,
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
new file mode 100644
index 0000000..154efa0
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+using System.Threading;
+using Grpc.Core;
+using System.Collections.Generic;
+using System.Linq;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Org.Apache.Rocketmq
+{
+    public class SimpleConsumer : Client
+    {
+
+        public SimpleConsumer(AccessPoint accessPoint,
+        string resourceNamespace, string group)
+        : base(accessPoint, resourceNamespace)
+        {
+            _fifo = false;
+            _subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
+            _topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
+            _group = group;
+        }
+
+        public override void BuildClientSetting(rmq::Settings settings)
+        {
+            base.BuildClientSetting(settings);
+
+            settings.ClientType = rmq::ClientType.SimpleConsumer;
+            settings.Subscription = new rmq::Subscription();
+            settings.Subscription.Group = new rmq::Resource();
+            settings.Subscription.Group.Name = _group;
+            settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+
+            foreach (var kv in _subscriptions)
+            {
+                settings.Subscription.Subscriptions.Add(kv.Value);
+            }
+        }
+
+        public override async Task Start()
+        {
+            await base.Start();
+            
+            // Scan load assignment periodically
+            schedule(async () =>
+            {
+                while (!_scanAssignmentCts.IsCancellationRequested)
+                {
+                    await ScanLoadAssignments();                    
+                }
+            }, 30, _scanAssignmentCts.Token);
+
+            await ScanLoadAssignments();
+        }
+
+        public override async Task Shutdown()
+        {
+            await base.Shutdown();
+            if (!await NotifyClientTermination())
+            {
+                Logger.Warn("Failed to NotifyClientTermination");
+            }
+        }
+
+        private async Task ScanLoadAssignments()
+        {
+
+            List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
+            List<string> topics = new List<string>();
+            foreach (var sub in _subscriptions)
+            {
+                var request = new rmq::QueryAssignmentRequest();
+                request.Topic = new rmq::Resource();
+                request.Topic.ResourceNamespace = ResourceNamespace;
+                request.Topic.Name = sub.Key;
+                topics.Add(sub.Key);
+                request.Group = new rmq::Resource();
+                request.Group.Name = _group;
+                request.Group.ResourceNamespace = ResourceNamespace;
+
+                request.Endpoints = new rmq::Endpoints();
+                request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
+                var address = new rmq::Address();
+                address.Host = _accessPoint.Host;
+                address.Port = _accessPoint.Port;
+                request.Endpoints.Addresses.Add(address);
+
+                var metadata = new Metadata();
+                Signature.sign(this, metadata);
+                tasks.Add(Manager.QueryLoadAssignment(_accessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
+            }
+
+            List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
+
+            var i = 0;
+            foreach (var assignments in list)
+            {
+                string topic = topics[i];
+                if (null == assignments || 0 == assignments.Count)
+                {
+                    Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
+                    ++i;
+                    continue;
+                }
+                Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
+                _topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
+                ++i;
+            }
+        }
+
+        protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+        {
+            request.ClientType = rmq::ClientType.SimpleConsumer;
+            request.Group = new rmq::Resource();
+            request.Group.Name = _group;
+            request.Group.ResourceNamespace = ResourceNamespace;
+        }
+
+        public void Subscribe(string topic, rmq::FilterType filterType, string expression)
+        {
+            var entry = new rmq::SubscriptionEntry();
+            entry.Topic = new rmq::Resource();
+            entry.Topic.Name = topic;
+            entry.Topic.ResourceNamespace = ResourceNamespace;
+            entry.Expression = new rmq::FilterExpression();
+            entry.Expression.Type = filterType;
+            entry.Expression.Expression = expression;
+            _subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
+            AddTopicOfInterest(topic);
+        }
+
+        public override void OnSettingsReceived(rmq.Settings settings)
+        {
+            base.OnSettingsReceived(settings);
+
+            if (settings.Subscription.Fifo)
+            {
+                _fifo = true;
+                Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
+            }
+        }
+
+        public async Task<List<Message>> Receive(int batchSize, TimeSpan timeout)
+        {
+            var messageQueue = NextQueue();
+            if (null == messageQueue)
+            {
+                Logger.Debug("NextQueue returned null");
+                return new List<Message>();
+            }
+
+            var request = new rmq.ReceiveMessageRequest();
+            request.Group = new rmq.Resource();
+            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group.Name = _group;
+
+            request.MessageQueue = new rmq.MessageQueue();
+            request.MessageQueue.MergeFrom(messageQueue);
+            request.BatchSize = batchSize;
+            
+            // Client is responsible of extending message invisibility duration
+            request.AutoRenew = false;
+            
+            var targetUrl = Utilities.TargetUrl(messageQueue);
+            var metadata = new Metadata();
+            Signature.sign(this, metadata);
+            
+            return await Manager.ReceiveMessage(targetUrl, metadata, request, timeout);
+        }
+
+
+        public async Task Ack(Message message)
+        {
+            var request = new rmq.AckMessageRequest();
+            request.Group = new rmq.Resource();
+            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group.Name = _group;
+
+            request.Topic = new rmq.Resource();
+            request.Topic.ResourceNamespace = ResourceNamespace;
+            request.Topic.Name = message.Topic;
+            
+            var entry = new rmq.AckMessageEntry();
+            request.Entries.Add(entry);
+            entry.MessageId = message.MessageId;
+            entry.ReceiptHandle = message._receiptHandle;
+
+            var targetUrl = message._sourceHost;
+            var metadata = new Metadata();
+            Signature.sign(this, metadata);
+            await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
+        }
+
+        public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
+        {
+            var request = new rmq.ChangeInvisibleDurationRequest();
+            request.Group = new rmq.Resource();
+            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group.Name = _group;
+
+            request.Topic = new rmq.Resource();
+            request.Topic.ResourceNamespace = ResourceNamespace;
+            request.Topic.Name = message.Topic;
+
+            request.ReceiptHandle = message._receiptHandle;
+            request.MessageId = message.MessageId;
+            
+            request.InvisibleDuration = Duration.FromTimeSpan(invisibleDuration);
+
+            var targetUrl = message._sourceHost;
+            var metadata = new Metadata();
+            Signature.sign(this, metadata);
+            await Manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
+        }
+        
+        private rmq.MessageQueue NextQueue()
+        {
+            if (_topicAssignments.IsEmpty)
+            {
+                return null;
+            }
+            
+            UInt32 topicSeq = CurrentTopicSequence.Value;
+            CurrentTopicSequence.Value = topicSeq + 1;
+
+            var total = _topicAssignments.Count;
+            var topicIndex = topicSeq % total;
+            var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
+            
+            UInt32 queueSeq = CurrentQueueSequence.Value;
+            CurrentQueueSequence.Value = queueSeq + 1;
+            List<rmq.Assignment> assignments;
+            if (_topicAssignments.TryGetValue(topic, out assignments))
+            {
+                if (null == assignments)
+                {
+                    return null;
+                }
+                var idx = queueSeq % assignments.Count;
+                return assignments[(int)idx].MessageQueue;
+
+            }
+
+            return null;
+        }
+
+        private ThreadLocal<UInt32> CurrentTopicSequence = new ThreadLocal<UInt32>(true);
+        private ThreadLocal<UInt32> CurrentQueueSequence = new ThreadLocal<UInt32>(true);
+
+        private readonly string _group;
+        private bool _fifo;
+        private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
+        private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
+        private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/StaticCredentialsProvider.cs b/csharp/rocketmq-client-csharp/StaticCredentialsProvider.cs
index 301613b..edd810d 100644
--- a/csharp/rocketmq-client-csharp/StaticCredentialsProvider.cs
+++ b/csharp/rocketmq-client-csharp/StaticCredentialsProvider.cs
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-namespace org.apache.rocketmq {
-    public class StaticCredentialsProvider : ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+    public class StaticCredentialsProvider : ICredentialsProvider
+    {
 
-        public StaticCredentialsProvider(string accessKey, string accessSecret) {
+        public StaticCredentialsProvider(string accessKey, string accessSecret)
+        {
             this.accessKey = accessKey;
             this.accessSecret = accessSecret;
         }
 
-        public Credentials getCredentials() {
+        public Credentials getCredentials()
+        {
             return new Credentials(accessKey, accessSecret);
         }
 
diff --git a/csharp/rocketmq-client-csharp/Topic.cs b/csharp/rocketmq-client-csharp/Topic.cs
index dcc7100..f1ae453 100644
--- a/csharp/rocketmq-client-csharp/Topic.cs
+++ b/csharp/rocketmq-client-csharp/Topic.cs
@@ -17,50 +17,68 @@
 
 using System;
 
-namespace org.apache.rocketmq {
-    public class Topic : IComparable<Topic>, IEquatable<Topic> {
-        public Topic(string resource_namespace, string name) {
-            resourceNamespace = resource_namespace;
-            this.name = name;
+namespace Org.Apache.Rocketmq
+{
+    public class Topic : IComparable<Topic>, IEquatable<Topic>
+    {
+        public Topic(string resourceNamespace, string name)
+        {
+            ResourceNamespace = resourceNamespace;
+            Name = name;
         }
 
-        private string resourceNamespace;
-        public string ResourceNamespace {
-            get { return resourceNamespace; }
-        }
+        public string ResourceNamespace { get; }
+        public string Name { get; }
 
-        private string name;
-        public string Name {
-            get { return name; }
-        }
-
-        public int CompareTo(Topic other) {
-            if (0 != resourceNamespace.CompareTo(other.resourceNamespace)) {
-                return resourceNamespace.CompareTo(other.resourceNamespace);
+        public bool Equals(Topic other)
+        {
+            if (ReferenceEquals(null, other))
+            {
+                return false;
             }
 
-            if (0 != name.CompareTo(other.name)) {
-                return name.CompareTo(other.name);
+            if (ReferenceEquals(this, other))
+            {
+                return true;
             }
 
-            return 0;
-        }
-
-        public bool Equals(Topic other) {
-            return resourceNamespace.Equals(other.resourceNamespace) && name.Equals(other.name);
+            return ResourceNamespace == other.ResourceNamespace && Name == other.Name;
         }
 
-        public override bool Equals(Object other) {
-            if (!(other is Topic)) {
+        public override bool Equals(object obj)
+        {
+            if (ReferenceEquals(null, obj) || obj.GetType() != GetType())
+            {
                 return false;
             }
-            return Equals(other as Topic);
+
+            if (ReferenceEquals(this, obj))
+            {
+                return true;
+            }
+
+            return Equals((Topic)obj);
         }
 
         public override int GetHashCode()
         {
-            return HashCode.Combine(resourceNamespace, name);
+            return HashCode.Combine(ResourceNamespace, Name);
         }
 
+        public int CompareTo(Topic other)
+        {
+            if (ReferenceEquals(null, other))
+            {
+                return -1;
+            }
+
+            var compareTo = String.CompareOrdinal(ResourceNamespace, other.ResourceNamespace);
+            if (0 == compareTo)
+            {
+                compareTo = String.CompareOrdinal(Name, other.Name);
+            }
+
+            return compareTo;
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs
index a860669..e4aa04c 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteData.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs
@@ -14,43 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
 using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
-
-    public class TopicRouteData : IEquatable<TopicRouteData> {
+namespace Org.Apache.Rocketmq
+{
+    public class TopicRouteData : IEquatable<TopicRouteData>
+    {
+        public TopicRouteData(List<rmq::MessageQueue> partitions)
+        {
+            _messageQueues = partitions;
 
-        public TopicRouteData(List<Partition> partitions) {
-            this.partitions = partitions;
-            this.partitions.Sort();
+            _messageQueues.Sort(Utilities.CompareMessageQueue);
         }
 
-        private List<Partition> partitions;
-
-        public List<Partition> Partitions {
-            get { return partitions; }
-        }
+        private List<rmq::MessageQueue> _messageQueues;
+        public List<rmq::MessageQueue> MessageQueues { get { return _messageQueues; } }
 
-        public bool Equals(TopicRouteData other) {
-            return partitions.Equals(other.partitions);
+        public bool Equals(TopicRouteData other)
+        {
+            if (ReferenceEquals(null, other)) return false;
+            if (ReferenceEquals(this, other)) return true;
+            return Equals(_messageQueues, other._messageQueues);
         }
 
-        public override bool Equals(object other)
+        public override bool Equals(object obj)
         {
-
-            if (!(other is TopicRouteData)) {
-                return false;
-            }
-
-            return Equals(other as TopicRouteData);
+            if (ReferenceEquals(null, obj)) return false;
+            if (ReferenceEquals(this, obj)) return true;
+            if (obj.GetType() != this.GetType()) return false;
+            return Equals((TopicRouteData)obj);
         }
 
         public override int GetHashCode()
         {
-            return HashCode.Combine(partitions);
+            return (_messageQueues != null ? _messageQueues.GetHashCode() : 0);
         }
-
     }
-
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/TopicRouteException.cs
index b520e72..75462fd 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteException.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteException.cs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 using System;
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public class TopicRouteException : Exception
     {
diff --git a/csharp/rocketmq-client-csharp/Utilities.cs b/csharp/rocketmq-client-csharp/Utilities.cs
index 1834a77..23ed8db 100644
--- a/csharp/rocketmq-client-csharp/Utilities.cs
+++ b/csharp/rocketmq-client-csharp/Utilities.cs
@@ -19,8 +19,10 @@ using System.Diagnostics;
 using System.Linq;
 using System.Net.NetworkInformation;
 using System.Text;
+using System;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     public static class Utilities
     {
@@ -49,5 +51,29 @@ namespace org.apache.rocketmq
 
             return result.ToString();
         }
+
+        public static string TargetUrl(rmq::MessageQueue messageQueue)
+        {
+            // TODO: Assert associated broker has as least one service endpoint.
+            var serviceEndpoint = messageQueue.Broker.Endpoints.Addresses[0];
+            return $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
+        }
+
+        public static int CompareMessageQueue(rmq::MessageQueue lhs, rmq::MessageQueue rhs)
+        {
+            int topic_comparison = String.Compare(lhs.Topic.ResourceNamespace + lhs.Topic.Name, rhs.Topic.ResourceNamespace + rhs.Topic.Name);
+            if (topic_comparison != 0)
+            {
+                return topic_comparison;
+            }
+
+            int broker_name_comparison = String.Compare(lhs.Broker.Name, rhs.Broker.Name);
+            if (0 != broker_name_comparison)
+            {
+                return broker_name_comparison;
+            }
+
+            return lhs.Id < rhs.Id ? -1 : (lhs.Id == rhs.Id ? 0 : 1);
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 24cc710..baf103f 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -6,25 +6,29 @@
     <Authors>Zhanhui Li</Authors>
     <Company>Apache Software Foundation</Company>
     <TargetFramework>net5.0</TargetFramework>
-    <RootNamespace>org.apache.rocketmq</RootNamespace>
+    <RootNamespace>Org.Apache.Rocketmq</RootNamespace>
     <GeneratePackageOnBuild>true</GeneratePackageOnBuild>
   </PropertyGroup>
 
   <ItemGroup>
+    <PackageReference Include="Crc32.NET" Version="1.2.0" />
     <PackageReference Include="Google.Protobuf" Version="3.19.4" />
-    <PackageReference Include="Grpc.Net.Client" Version="2.42.0" />
+    <PackageReference Include="Grpc.Net.Client" Version="2.43.0" />
     <PackageReference Include="Grpc.Tools" Version="2.43.0">
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
       <PrivateAssets>all</PrivateAssets>
     </PackageReference>
     <PackageReference Include="NLog" Version="4.7.13" />
+    <PackageReference Include="OpenTelemetry" Version="1.3.0" />
+    <PackageReference Include="OpenTelemetry.Api" Version="1.3.0" />
+    <PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.3.0" />
 
-    <Protobuf Include="Protos\apache\rocketmq\v1\definition.proto" ProtoRoot="Protos" GrpcServices="Client" />
+    <Protobuf Include="Protos\apache\rocketmq\v2\definition.proto" ProtoRoot="Protos" GrpcServices="Client" />
     <Protobuf Include="Protos\google\rpc\code.proto" ProtoRoot="Protos" GrpcServices="Client" />
     <Protobuf Include="Protos\google\rpc\error_details.proto" ProtoRoot="Protos" GrpcServices="Client" />
     <Protobuf Include="Protos\google\rpc\status.proto" ProtoRoot="Protos" GrpcServices="Client" />
-    <Protobuf Include="Protos\apache\rocketmq\v1\service.proto" ProtoRoot="Protos" GrpcServices="Client">
-      <Link>Protos\apache\rocketmq\v1\definition.proto</Link>
+    <Protobuf Include="Protos\apache\rocketmq\v2\service.proto" ProtoRoot="Protos" GrpcServices="Client">
+      <Link>Protos\apache\rocketmq\v2\definition.proto</Link>
       <Link>Protos\google\rpc\status.proto</Link>
       <Link>Protos\google\rpc\error_details.proto</Link>
     </Protobuf>
diff --git a/csharp/tests/ClientConfigTest.cs b/csharp/tests/ClientConfigTest.cs
index c6d83cf..4d8dec1 100644
--- a/csharp/tests/ClientConfigTest.cs
+++ b/csharp/tests/ClientConfigTest.cs
@@ -17,11 +17,14 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class ClientConfigTest {
+    public class ClientConfigTest
+    {
         [TestMethod]
-        public void testClientId() {
+        public void testClientId()
+        {
             var clientConfig = new ClientConfig();
             string clientId = clientConfig.clientId();
             Assert.IsTrue(clientId.Contains("@"));
diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs
index 0f8bff7..af5983c 100644
--- a/csharp/tests/ClientManagerTest.cs
+++ b/csharp/tests/ClientManagerTest.cs
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 using System;
+using Grpc.Core;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using rmq = apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-using System.Threading;
-using System.Threading.Tasks;
+using rmq = Apache.Rocketmq.V2;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class ClientManagerTest {
-        
+    public class ClientManagerTest
+    {
+
         [TestMethod]
-        public void testResolveRoute() {
+        public void TestResolveRoute()
+        {
             string topic = "cpp_sdk_standard";
             string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
             var request = new rmq::QueryRouteRequest();
@@ -41,7 +42,7 @@ namespace org.apache.rocketmq {
             address.Port = 80;
             request.Endpoints.Addresses.Add(address);
 
-            var metadata = new grpc::Metadata();
+            var metadata = new Metadata();
             var clientConfig = new ClientConfig();
             var credentialsProvider = new ConfigFileCredentialsProvider();
             clientConfig.CredentialsProvider = credentialsProvider;
@@ -50,7 +51,7 @@ namespace org.apache.rocketmq {
             Signature.sign(clientConfig, metadata);
             var clientManager = new ClientManager();
             string target = "https://116.62.231.199:80";
-            var topicRouteData = clientManager.resolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+            var topicRouteData = clientManager.ResolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
             Console.WriteLine(topicRouteData);
         }
     }
diff --git a/csharp/tests/ConfigFileCredentialsProviderTest.cs b/csharp/tests/ConfigFileCredentialsProviderTest.cs
index f94d364..7741295 100644
--- a/csharp/tests/ConfigFileCredentialsProviderTest.cs
+++ b/csharp/tests/ConfigFileCredentialsProviderTest.cs
@@ -18,11 +18,14 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class ConfigFileCredentialsProviderTest {
+    public class ConfigFileCredentialsProviderTest
+    {
         [TestMethod]
-        public void testGetCredentials() {
+        public void testGetCredentials()
+        {
             var provider = new ConfigFileCredentialsProvider();
             var credentials = provider.getCredentials();
             Assert.IsNotNull(credentials);
diff --git a/csharp/tests/DateTimeTest.cs b/csharp/tests/DateTimeTest.cs
index 568d59e..fdf7d53 100644
--- a/csharp/tests/DateTimeTest.cs
+++ b/csharp/tests/DateTimeTest.cs
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 
-namespace org.apache.rocketmq {
-    
+namespace Org.Apache.Rocketmq
+{
+
     [TestClass]
-    public class DateTimeTest {
-        
+    public class DateTimeTest
+    {
+
         [TestMethod]
-        public void testFormat() {
+        public void testFormat()
+        {
             DateTime instant = new DateTime(2022, 02, 15, 08, 31, 56);
-            string time =  instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
+            string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
             string expected = "20220215T083156Z";
             Assert.AreEqual(time, expected);
         }
diff --git a/csharp/tests/MessageIdGeneratorTest.cs b/csharp/tests/MessageIdGeneratorTest.cs
index 6ed34d6..c98e113 100644
--- a/csharp/tests/MessageIdGeneratorTest.cs
+++ b/csharp/tests/MessageIdGeneratorTest.cs
@@ -16,7 +16,7 @@
  */
 
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
 
 namespace tests
 {
diff --git a/csharp/tests/MessageTest.cs b/csharp/tests/MessageTest.cs
index 3dd7f4b..f1c71f8 100644
--- a/csharp/tests/MessageTest.cs
+++ b/csharp/tests/MessageTest.cs
@@ -19,12 +19,15 @@ using System;
 using System.Text;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class MessageTest {
+    public class MessageTest
+    {
 
         [TestMethod]
-        public void testCtor() {
+        public void testCtor()
+        {
             var msg1 = new Message();
             Assert.IsNotNull(msg1.MessageId);
             Assert.IsTrue(msg1.MessageId.StartsWith("01"));
@@ -36,7 +39,8 @@ namespace org.apache.rocketmq {
         }
 
         [TestMethod]
-        public void testCtor2() {
+        public void testCtor2()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -49,7 +53,8 @@ namespace org.apache.rocketmq {
         }
 
         [TestMethod]
-        public void testCtor3() {
+        public void testCtor3()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -63,7 +68,8 @@ namespace org.apache.rocketmq {
         }
 
         [TestMethod]
-        public void testCtor4() {
+        public void testCtor4()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -81,7 +87,8 @@ namespace org.apache.rocketmq {
         }
 
         [TestMethod]
-        public void testCtor5() {
+        public void testCtor5()
+        {
             string topic = "T1";
             string bodyString = "body";
             byte[] body = Encoding.ASCII.GetBytes(bodyString);
diff --git a/csharp/tests/MqLogManagerTest.cs b/csharp/tests/MqLogManagerTest.cs
index 71be3f5..4d163b2 100644
--- a/csharp/tests/MqLogManagerTest.cs
+++ b/csharp/tests/MqLogManagerTest.cs
@@ -1,7 +1,7 @@
 using System;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using NLog;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
 
 namespace tests
 {
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index 961b167..663980a 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -15,60 +15,173 @@
  * limitations under the License.
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Collections.Generic;
 using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Org.Apache.Rocketmq;
 
-namespace org.apache.rocketmq
-{
 
+namespace tests
+{
     [TestClass]
     public class ProducerTest
     {
 
+        private static AccessPoint _accessPoint;
+
         [ClassInitialize]
         public static void SetUp(TestContext context)
         {
-            List<string> nameServerAddress = new List<string>();
-            nameServerAddress.Add(string.Format("{0}:{1}", host, port));
-            resolver = new StaticNameServerResolver(nameServerAddress);
-
-            credentialsProvider = new ConfigFileCredentialsProvider();
+            _accessPoint = new AccessPoint
+            {
+                Host = HOST,
+                Port = PORT
+            };
         }
 
         [ClassCleanup]
         public static void TearDown()
         {
+        }
 
+        [TestMethod]
+        public async Task TestLifecycle()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            await producer.Shutdown();
         }
 
+        [TestMethod]
+        public async Task TestSendStandardMessage()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            
+            // Tag the massage. A message has at most one tag.
+            msg.Tag = "Tag-0";
+            
+            // Associate the message with one or multiple keys
+            var keys = new List<string>();
+            keys.Add("k1");
+            keys.Add("k2");
+            msg.Keys = keys;
+            
+            var sendResult = await producer.Send(msg);
+            Assert.IsNotNull(sendResult);
+            await producer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestSendMultipleMessages()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            for (var i = 0; i < 128; i++)
+            {
+                var msg = new Message(topic, body);
+            
+                // Tag the massage. A message has at most one tag.
+                msg.Tag = "Tag-0";
+            
+                // Associate the message with one or multiple keys
+                var keys = new List<string>();
+                keys.Add("k1");
+                keys.Add("k2");
+                msg.Keys = keys;
+                var sendResult = await producer.Send(msg);
+                Assert.IsNotNull(sendResult);                
+            }
+            await producer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestSendFifoMessage()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            
+            // Messages of the same group will get delivered one after another. 
+            msg.MessageGroup = "message-group-0";
+            
+            // Verify messages are FIFO iff their message group is not null or empty.
+            Assert.IsTrue(msg.Fifo());
 
+            var sendResult = await producer.Send(msg);
+            Assert.IsNotNull(sendResult);
+            await producer.Shutdown();
+        }
+        
         [TestMethod]
-        public void testSendMessage()
+        public async Task TestSendScheduledMessage()
         {
-            var producer = new Producer(resolver, resourceNamespace);
-            producer.ResourceNamespace = resourceNamespace;
+            var producer = new Producer(_accessPoint, resourceNamespace);
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
-            producer.start();
+            await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             var msg = new Message(topic, body);
-            var sendResult = producer.send(msg).GetAwaiter().GetResult();
+            
+            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            Assert.IsTrue(msg.Scheduled());
+            
+            var sendResult = await producer.Send(msg);
             Assert.IsNotNull(sendResult);
-            producer.shutdown();
+            await producer.Shutdown();
         }
+        
+        
+        /**
+         * Trying send a message that is both FIFO and Scheduled should fail.
+         */
+        [TestMethod]
+        public async Task TestSendMessage_Failure()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            msg.MessageGroup = "Group-0";
+            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            Assert.IsTrue(msg.Scheduled());
 
-        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+            try
+            {
+                await producer.Send(msg);
+                Assert.Fail("Should have raised an exception");
+            }
+            catch (MessageException e)
+            {
+            }
+            await producer.Shutdown();
+        }
 
-        private static string topic = "cpp_sdk_standard";
+        private static string resourceNamespace = "";
 
-        private static string clientId = "C001";
-        private static string group = "GID_cpp_sdk_standard";
+        private static string topic = "cpp_sdk_standard";
 
-        private static INameServerResolver resolver;
-        private static ICredentialsProvider credentialsProvider;
-        private static string host = "116.62.231.199";
-        private static int port = 80;
+        private static string HOST = "127.0.0.1";
+        private static int PORT = 8081;
     }
 
 }
\ No newline at end of file
diff --git a/csharp/tests/PushConsumerTest.cs b/csharp/tests/PushConsumerTest.cs
new file mode 100644
index 0000000..78f01de
--- /dev/null
+++ b/csharp/tests/PushConsumerTest.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System.Collections.Generic;
+using System;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+
+    public class TestMessageListener : IMessageListener
+    {
+        public Task Consume(List<Message> messages, List<Message> failed)
+        {
+            foreach (var message in messages)
+            {
+                Console.WriteLine("");
+            }
+
+            return Task.CompletedTask;
+        }
+    }
+
+    public class CountableMessageListener : IMessageListener
+    {
+        public Task Consume(List<Message> messages, List<Message> failed)
+        {
+            foreach (var message in messages)
+            {
+                Console.WriteLine("{}", message.MessageId);
+            }
+
+            return Task.CompletedTask;
+        }
+    }
+
+    [TestClass]
+    public class PushConsumerTest
+    {
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+            credentialsProvider = new ConfigFileCredentialsProvider();
+
+        }
+
+        [ClassCleanup]
+        public static void TearDown()
+        {
+
+        }
+
+        [TestInitialize]
+        public void SetUp()
+        {
+            accessPoint = new AccessPoint();
+            accessPoint.Host = host;
+            accessPoint.Port = port;
+        }
+
+        [TestMethod]
+        public void testLifecycle()
+        {
+            var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
+            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            consumer.Region = "cn-hangzhou-pre";
+            consumer.Subscribe(topic, "*", ExpressionType.TAG);
+            consumer.RegisterListener(new TestMessageListener());
+            consumer.Start();
+
+            consumer.Shutdown();
+        }
+
+
+        // [Ignore]
+        [TestMethod]
+        public void testConsumeMessage()
+        {
+            var consumer = new PushConsumer(accessPoint, resourceNamespace, group);
+            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            consumer.Region = "cn-hangzhou-pre";
+            consumer.Subscribe(topic, "*", ExpressionType.TAG);
+            consumer.RegisterListener(new CountableMessageListener());
+            consumer.Start();
+            System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
+            consumer.Shutdown();
+        }
+
+
+        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+
+        private static string topic = "cpp_sdk_standard";
+
+        private static string group = "GID_cpp_sdk_standard";
+
+        private static ICredentialsProvider credentialsProvider;
+        private static string host = "116.62.231.199";
+        private static int port = 80;
+
+        private AccessPoint accessPoint;
+
+    }
+
+}
\ No newline at end of file
diff --git a/csharp/tests/RpcClientTest.cs b/csharp/tests/RpcClientTest.cs
index 5425973..a1ecf82 100644
--- a/csharp/tests/RpcClientTest.cs
+++ b/csharp/tests/RpcClientTest.cs
@@ -15,149 +15,132 @@
  * limitations under the License.
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
-using Grpc.Net.Client;
-using rmq = global::apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
 using System;
-using pb = global::Google.Protobuf;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using grpc = Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
 
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
+
     [TestClass]
     public class RpcClientTest
     {
 
-
-        [ClassInitialize]
-        public static void SetUp(TestContext context)
+        [TestMethod]
+        public async Task testTelemetry()
         {
-            string target = string.Format("https://{0}:{1}", host, port);
-            var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
-            {
-                HttpHandler = ClientManager.createHttpHandler()
-            });
-            var invoker = channel.Intercept(new ClientLoggerInterceptor());
-            var client = new rmq::MessagingService.MessagingServiceClient(invoker);
-            rpcClient = new RpcClient(client);
-
-            clientConfig = new ClientConfig();
-            var credentialsProvider = new ConfigFileCredentialsProvider();
-            clientConfig.CredentialsProvider = credentialsProvider;
-            clientConfig.ResourceNamespace = resourceNamespace;
-            clientConfig.Region = "cn-hangzhou-pre";
-        }
+            Console.WriteLine("Test Telemetry streaming");
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
+            var metadata = new grpc::Metadata();
+            Signature.sign(client_config, metadata);
 
-        [ClassCleanup]
-        public static void TearDown()
-        {
+            var cmd = new rmq::TelemetryCommand();
+            cmd.Settings = new rmq::Settings();
+            cmd.Settings.ClientType = rmq::ClientType.Producer;
+            cmd.Settings.AccessPoint = new rmq::Endpoints();
+            cmd.Settings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            address.Port = 8081;
+            address.Host = "11.166.42.94";
+            cmd.Settings.AccessPoint.Addresses.Add(address);
+            cmd.Settings.RequestTimeout = new Google.Protobuf.WellKnownTypes.Duration();
+            cmd.Settings.RequestTimeout.Seconds = 3;
+            cmd.Settings.RequestTimeout.Nanos = 0;
+            cmd.Settings.Publishing = new rmq::Publishing();
+            var topic = new rmq::Resource();
+            topic.Name = "cpp_sdk_standard";
+            cmd.Settings.Publishing.Topics.Add(topic);
+            cmd.Settings.UserAgent = new rmq::UA();
+            cmd.Settings.UserAgent.Language = rmq::Language.DotNet;
+            cmd.Settings.UserAgent.Version = "1.0";
+            cmd.Settings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+            cmd.Settings.UserAgent.Platform = System.Environment.OSVersion.ToString();
+
+            var duplexStreaming = rpc_client.Telemetry(metadata);
+            var reader = duplexStreaming.ResponseStream;
+            var writer = duplexStreaming.RequestStream;
+
+            var cts = new CancellationTokenSource();
+            await writer.WriteAsync(cmd);
+            Console.WriteLine("Command written");
+            if (await reader.MoveNext(cts.Token))
+            {
+                var response = reader.Current;
+                switch (response.CommandCase)
+                {
+                    case rmq::TelemetryCommand.CommandOneofCase.Settings:
+                        {
+                            var responded_settings = response.Settings;
+                            Console.WriteLine($"{responded_settings.ToString()}");
+                            break;
+                        }
+                    case rmq::TelemetryCommand.CommandOneofCase.None:
+                        {
+                            Console.WriteLine($"Unknown response command type: {response.Status.ToString()}");
+                            break;
+                        }
+                }
+                Console.WriteLine("Server responded ");
+            }
+            else
+            {
+                Console.WriteLine("Server is not responding");
+                var status = duplexStreaming.GetStatus();
+                Console.WriteLine($"status={status.ToString()}");
 
+                var trailers = duplexStreaming.GetTrailers();
+                Console.WriteLine($"trailers={trailers.ToString()}");
+            }
         }
 
         [TestMethod]
         public void testQueryRoute()
         {
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
+            var metadata = new grpc::Metadata();
+            Signature.sign(client_config, metadata);
             var request = new rmq::QueryRouteRequest();
             request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = resourceNamespace;
-            request.Topic.Name = topic;
+            request.Topic.Name = "cpp_sdk_standard";
             request.Endpoints = new rmq::Endpoints();
             request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
             var address = new rmq::Address();
-            address.Host = host;
-            address.Port = port;
+            address.Port = 8081;
+            address.Host = "11.166.42.94";
             request.Endpoints.Addresses.Add(address);
-
-            var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.queryRoute(request, callOptions).GetAwaiter().GetResult();
+            var response = rpc_client.QueryRoute(metadata, request, client_config.RequestTimeout);
+            var result = response.GetAwaiter().GetResult();
         }
 
-
         [TestMethod]
-        public void testHeartbeat()
+        public async Task TestSend()
         {
-            var request = new rmq::HeartbeatRequest();
-            request.ClientId = clientId;
-            request.ProducerData = new rmq::ProducerData();
-            request.ProducerData.Group = new rmq::Resource();
-            request.ProducerData.Group.ResourceNamespace = resourceNamespace;
-            request.ProducerData.Group.Name = topic;
-            request.FifoFlag = false;
-
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
             var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.heartbeat(request, callOptions).GetAwaiter().GetResult();
-        }
+            Signature.sign(client_config, metadata);
 
-        [TestMethod]
-        public void testSendMessage()
-        {
             var request = new rmq::SendMessageRequest();
-            request.Message = new rmq::Message();
-            byte[] body = new byte[1024];
-            for (int i = 0; i < body.Length; i++)
-            {
-                body[i] = (byte)'x';
-            }
-            request.Message.Body = pb::ByteString.CopyFrom(body);
-            request.Message.Topic = new rmq::Resource();
-            request.Message.Topic.ResourceNamespace = resourceNamespace;
-            request.Message.Topic.Name = topic;
-            request.Message.UserAttribute.Add("k", "v");
-            request.Message.UserAttribute.Add("key", "value");
-            request.Message.SystemAttribute = new rmq::SystemAttribute();
-            request.Message.SystemAttribute.Tag = "TagA";
-            request.Message.SystemAttribute.Keys.Add("key1");
-            request.Message.SystemAttribute.MessageId = SequenceGenerator.Instance.Next();
-
-            var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.AddSeconds(3);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-
-            var response = rpcClient.sendMessage(request, callOptions).GetAwaiter().GetResult();
-        }
-
-        // Remove the Ignore annotation if server has fixed
-        [Ignore]
-        [TestMethod]
-        public void testNotifyClientTermiantion()
-        {
-            var request = new rmq::NotifyClientTerminationRequest();
-            request.ClientId = clientId;
-            request.ProducerGroup = new rmq::Resource();
-            request.ProducerGroup.ResourceNamespace = resourceNamespace;
-            request.ProducerGroup.Name = group;
-
-            var metadata = new grpc::Metadata();
-            Signature.sign(clientConfig, metadata);
-
-            var deadline = DateTime.UtcNow.AddSeconds(3);
-            var callOptions = new grpc::CallOptions(metadata, deadline);
-            var response = rpcClient.notifyClientTermination(request, callOptions).GetAwaiter().GetResult();
+            var message = new rmq::Message();
+            message.Topic = new rmq::Resource();
+            message.Topic.Name = "cpp_sdk_standard";
+            message.Body = Google.Protobuf.ByteString.CopyFromUtf8("Test Body");
+            message.SystemProperties = new rmq::SystemProperties();
+            message.SystemProperties.Tag = "TagA";
+            message.SystemProperties.MessageId = "abc";
+            request.Messages.Add(message);
+            var response = await rpc_client.SendMessage(metadata, request, TimeSpan.FromSeconds(3));
+            Assert.AreEqual(rmq::Code.Ok, response.Status.Code);
         }
-
-        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
-
-        private static string topic = "cpp_sdk_standard";
-
-        private static string clientId = "C001";
-        private static string group = "GID_cpp_sdk_standard";
-
-        private static string host = "116.62.231.199";
-        private static int port = 80;
-
-        private static IRpcClient rpcClient;
-        private static ClientConfig clientConfig;
     }
 }
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/tests/SendResultTest.cs
index 8dd033a..4e3d9a0 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/tests/SendResultTest.cs
@@ -17,28 +17,32 @@
 
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class SendResultTest {
+    public class SendResultTest
+    {
 
         [TestMethod]
-        public void testCtor() {
+        public void testCtor()
+        {
             string messageId = new string("abc");
-            var sendResult = new SendResult(messageId);
+            var sendResult = new SendReceipt(messageId);
             Assert.AreEqual(messageId, sendResult.MessageId);
             Assert.AreEqual(SendStatus.SEND_OK, sendResult.Status);
         }
 
 
         [TestMethod]
-        public void testCtor2() {
+        public void testCtor2()
+        {
             string messageId = new string("abc");
-            var sendResult = new SendResult(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
+            var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
             Assert.AreEqual(messageId, sendResult.MessageId);
             Assert.AreEqual(SendStatus.FLUSH_DISK_TIMEOUT, sendResult.Status);
         }
 
     }
-    
+
 }
\ No newline at end of file
diff --git a/csharp/tests/SequenceGeneratorTest.cs b/csharp/tests/SequenceGeneratorTest.cs
index fc0ceb0..9b55334 100644
--- a/csharp/tests/SequenceGeneratorTest.cs
+++ b/csharp/tests/SequenceGeneratorTest.cs
@@ -19,7 +19,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq
+namespace Org.Apache.Rocketmq
 {
     [TestClass]
     public class SequenceGeneratorTest
diff --git a/csharp/tests/SignatureTest.cs b/csharp/tests/SignatureTest.cs
index cece257..16d0f46 100644
--- a/csharp/tests/SignatureTest.cs
+++ b/csharp/tests/SignatureTest.cs
@@ -19,10 +19,12 @@ using grpc = global::Grpc.Core;
 using Moq;
 using System;
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
 
     [TestClass]
-    public class SignatureTest {
+    public class SignatureTest
+    {
 
         [TestMethod]
         public void testSign()
@@ -33,7 +35,7 @@ namespace org.apache.rocketmq {
             mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
             mock.Setup(x => x.serviceName()).Returns("mq");
             mock.Setup(x => x.region()).Returns("cn-hangzhou");
-            
+
             string accessKey = "key";
             string accessSecret = "secret";
             var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/csharp/tests/SimpleConsumerTest.cs b/csharp/tests/SimpleConsumerTest.cs
new file mode 100644
index 0000000..c986614
--- /dev/null
+++ b/csharp/tests/SimpleConsumerTest.cs
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
+using Castle.Core.Logging;
+using Org.Apache.Rocketmq;
+
+namespace tests
+{
+
+    [TestClass]
+    public class SimpleConsumerTest
+    {
+
+        private static AccessPoint accessPoint;
+        private static string _resourceNamespace = "";
+        private static string _group = "GID_cpp_sdk_standard";
+        private static string _topic = "cpp_sdk_standard";
+
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+            accessPoint = new AccessPoint
+            {
+                Host = "127.0.0.1",
+                Port = 8081
+            };
+        }
+
+        [TestMethod]
+        public async Task TestLifecycle()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            Thread.Sleep(1_000);
+            await simpleConsumer.Shutdown();
+        }
+
+        [TestMethod]
+        public async Task TestReceive()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            var batchSize = 32;
+            var receiveTimeout = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, receiveTimeout);
+            Assert.IsTrue(messages.Count > 0);
+            Assert.IsTrue(messages.Count <= batchSize);
+            await simpleConsumer.Shutdown();
+        }
+        
+        
+        [TestMethod]
+        public async Task TestAck()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            var batchSize = 32;
+            var receiveTimeout = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, receiveTimeout);
+            foreach (var message in messages)
+            {
+                await simpleConsumer.Ack(message);
+                Console.WriteLine($"Ack {message.MessageId} OK");
+            }
+            await simpleConsumer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestChangeInvisibleDuration()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            var batchSize = 32;
+            var receiveTimeout = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, receiveTimeout);
+            foreach (var message in messages)
+            {
+                await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(10));
+                Console.WriteLine($"ChangeInvisibleDuration for message[MsgId={message.MessageId}] OK");
+            }
+            await simpleConsumer.Shutdown();
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/StaticCredentialsProviderTest.cs b/csharp/tests/StaticCredentialsProviderTest.cs
index 20b957e..8b5f012 100644
--- a/csharp/tests/StaticCredentialsProviderTest.cs
+++ b/csharp/tests/StaticCredentialsProviderTest.cs
@@ -17,12 +17,15 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 
 
-namespace org.apache.rocketmq {
+namespace Org.Apache.Rocketmq
+{
     [TestClass]
-    public class StaticCredentialsProviderTest {
+    public class StaticCredentialsProviderTest
+    {
 
         [TestMethod]
-        public void testGetCredentials() {
+        public void testGetCredentials()
+        {
             var accessKey = "key";
             var accessSecret = "secret";
             var provider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/csharp/tests/TopicTest.cs b/csharp/tests/TopicTest.cs
index fcc15e4..9f386de 100644
--- a/csharp/tests/TopicTest.cs
+++ b/csharp/tests/TopicTest.cs
@@ -17,13 +17,16 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System.Collections.Generic;
 
-namespace org.apache.rocketmq {
-     
-     [TestClass]
-     public class TopicTest {
+namespace Org.Apache.Rocketmq
+{
 
-         [TestMethod]
-         public void testCompareTo() {
+    [TestClass]
+    public class TopicTest
+    {
+
+        [TestMethod]
+        public void testCompareTo()
+        {
             List<Topic> topics = new List<Topic>();
             topics.Add(new Topic("ns1", "t1"));
             topics.Add(new Topic("ns0", "t1"));
@@ -36,13 +39,13 @@ namespace org.apache.rocketmq {
 
             Assert.AreEqual(topics[1].ResourceNamespace, "ns0");
             Assert.AreEqual(topics[1].Name, "t1");
-            
+
 
             Assert.AreEqual(topics[2].ResourceNamespace, "ns1");
             Assert.AreEqual(topics[2].Name, "t1");
-            
+
         }
 
 
-     }
- }
\ No newline at end of file
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/UnitTest1.cs b/csharp/tests/UnitTest1.cs
index 4689e52..bbf537a 100644
--- a/csharp/tests/UnitTest1.cs
+++ b/csharp/tests/UnitTest1.cs
@@ -1,8 +1,12 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using org.apache.rocketmq;
+using Org.Apache.Rocketmq;
 using Grpc.Net.Client;
-using apache.rocketmq.v1;
+using rmq = Apache.Rocketmq.V2;
+
 using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
 namespace tests
 {
     [TestClass]
@@ -11,40 +15,61 @@ namespace tests
         [TestMethod]
         public void TestMethod1()
         {
-            apache.rocketmq.v1.Permission perm = apache.rocketmq.v1.Permission.None;
-            switch(perm) {
-                case apache.rocketmq.v1.Permission.None:
-                {
-                    Console.WriteLine("None");
-                    break;
-                }
-
-                case apache.rocketmq.v1.Permission.Read:
-                {
-                    Console.WriteLine("Read");
-                    break;
-                }
-
-                case apache.rocketmq.v1.Permission.Write:
-                {
-                    Console.WriteLine("Write");
-                    break;
-                }
-
-                case apache.rocketmq.v1.Permission.ReadWrite:
-                {
-                    Console.WriteLine("ReadWrite");
-                    break;
-                }
+            rmq::Permission perm = rmq::Permission.None;
+            switch (perm)
+            {
+                case rmq::Permission.None:
+                    {
+                        Console.WriteLine("None");
+                        break;
+                    }
+
+                case rmq::Permission.Read:
+                    {
+                        Console.WriteLine("Read");
+                        break;
+                    }
+
+                case rmq::Permission.Write:
+                    {
+                        Console.WriteLine("Write");
+                        break;
+                    }
+
+                case rmq::Permission.ReadWrite:
+                    {
+                        Console.WriteLine("ReadWrite");
+                        break;
+                    }
 
             }
         }
 
         [TestMethod]
-        public void TestRpcClientImplCtor() {
-            using var channel = GrpcChannel.ForAddress("https://localhost:5001");
-            var client = new MessagingService.MessagingServiceClient(channel);
-            RpcClient impl = new RpcClient(client);
+        public void TestRpcClientImplCtor()
+        {
+            RpcClient impl = new RpcClient("https://localhost:5001");
+        }
+
+        [TestMethod]
+        public void TestConcurrentDictionary()
+        {
+            var dict = new ConcurrentDictionary<string, List<String>>();
+            string s = "abc";
+            List<String> result;
+            var exists = dict.TryGetValue(s, out result);
+            Assert.IsFalse(exists);
+            Assert.IsNull(result);
+
+            result = new List<string>();
+            result.Add("abc");
+            Assert.IsTrue(dict.TryAdd(s, result));
+
+            List<String> list;
+            exists = dict.TryGetValue(s, out list);
+            Assert.IsTrue(exists);
+            Assert.IsNotNull(list);
+            Assert.AreEqual(1, list.Count);
         }
     }
 }