You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/03/04 09:33:17 UTC

[rocketmq-client-csharp] branch develop updated: Implement the first alpha version of PushConsumer, with many TODOs (#14)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/develop by this push:
     new 684a558  Implement the first alpha version of PushConsumer, with many TODOs (#14)
684a558 is described below

commit 684a5580aff3ea0f2b1eb43fa4b3036a9046acd5
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Fri Mar 4 17:33:13 2022 +0800

    Implement the first alpha version of PushConsumer, with many TODOs (#14)
    
    Implement an initial version of PushConsumer
---
 rocketmq-client-csharp/Client.cs                   | 256 +++++++++++++++---
 rocketmq-client-csharp/ClientConfig.cs             |   2 +-
 rocketmq-client-csharp/ClientManager.cs            | 138 +++++++++-
 .../{IClient.cs => ExpressionType.cs}              |  16 +-
 .../{IClient.cs => FilterExpression.cs}            |  25 +-
 rocketmq-client-csharp/IClient.cs                  |   5 +-
 rocketmq-client-csharp/IClientManager.cs           |  16 +-
 .../{IProducer.cs => IConsumer.cs}                 |  13 +-
 .../{IClient.cs => IMessageListener.cs}            |  12 +-
 rocketmq-client-csharp/IProducer.cs                |   2 +-
 rocketmq-client-csharp/Message.cs                  |  13 +-
 .../{IClient.cs => ProcessQueue.cs}                |  25 +-
 rocketmq-client-csharp/Producer.cs                 |   4 +-
 rocketmq-client-csharp/PushConsumer.cs             | 291 +++++++++++++++++++++
 .../rocketmq-client-csharp.csproj                  |   1 +
 tests/ProducerTest.cs                              |   3 +-
 tests/{ProducerTest.cs => PushConsumerTest.cs}     |  64 ++++-
 tests/RpcClientTest.cs                             | 221 +++++++++++++++-
 tests/UnitTest1.cs                                 |  24 ++
 19 files changed, 1027 insertions(+), 104 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index e628d65..04a4ed3 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -20,7 +20,7 @@ using System.Collections.Concurrent;
 using System.Threading.Tasks;
 using System.Threading;
 using System;
-using Apache.Rocketmq.V1;
+using rmq = Apache.Rocketmq.V1;
 using grpc = global::Grpc.Core;
 using NLog;
 
@@ -29,7 +29,7 @@ namespace Org.Apache.Rocketmq
 {
     public abstract class Client : ClientConfig, IClient
     {
-        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+        protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
         public Client(INameServerResolver resolver, string resourceNamespace)
         {
@@ -40,6 +40,8 @@ namespace Org.Apache.Rocketmq
 
             _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
             _updateTopicRouteCts = new CancellationTokenSource();
+
+            _healthCheckCts = new CancellationTokenSource();
         }
 
         public virtual void Start()
@@ -55,14 +57,55 @@ namespace Org.Apache.Rocketmq
 
             }, 30, _updateTopicRouteCts.Token);
 
+            schedule(async () =>
+            {
+                await HealthCheck();
+            }, 30, _healthCheckCts.Token);
+
         }
 
-        public virtual async Task Shutdown()
+        public virtual void Shutdown()
         {
             Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
             _updateTopicRouteCts.Cancel();
             _nameServerResolverCts.Cancel();
-            await Manager.Shutdown();
+            Manager.Shutdown().GetAwaiter().GetResult();
+        }
+
+        protected string FilterBroker(Func<string, bool> acceptor)
+        {
+            foreach (var item in _topicRouteTable)
+            {
+                foreach (var partition in item.Value.Partitions)
+                {
+                    string target = partition.Broker.targetUrl();
+                    if (acceptor(target))
+                    {
+                        return target;
+                    }
+                }
+            }
+            return null;
+        }
+
+        /**
+         * Return all endpoints of brokers in route table.
+         */
+        private List<string> AvailableBrokerEndpoints()
+        {
+            List<string> endpoints = new List<string>();
+            foreach (var item in _topicRouteTable)
+            {
+                foreach (var partition in item.Value.Partitions)
+                {
+                    string endpoint = partition.Broker.targetUrl();
+                    if (!endpoints.Contains(endpoint))
+                    {
+                        endpoints.Add(endpoint);
+                    }
+                }
+            }
+            return endpoints;
         }
 
         private async Task UpdateNameServerList()
@@ -153,6 +196,18 @@ namespace Org.Apache.Rocketmq
             });
         }
 
+        protected rmq::Endpoints AccessEndpoint(string nameServer)
+        {
+            var endpoints = new rmq::Endpoints();
+            endpoints.Scheme = global::Apache.Rocketmq.V1.AddressScheme.Ipv4;
+            var address = new global::Apache.Rocketmq.V1.Address();
+            int pos = nameServer.LastIndexOf(':');
+            address.Host = nameServer.Substring(0, pos);
+            address.Port = Int32.Parse(nameServer.Substring(pos + 1));
+            endpoints.Addresses.Add(address);
+            return endpoints;
+        }
+
         /**
          * Parameters:
          * topic
@@ -187,59 +242,198 @@ namespace Org.Apache.Rocketmq
                 // We got one or more name servers available.
                 int index = (_currentNameServerIndex + retry) % _nameServers.Count;
                 string nameServer = _nameServers[index];
-                var request = new QueryRouteRequest();
-                request.Topic = new Resource();
+                var request = new rmq::QueryRouteRequest();
+                request.Topic = new rmq::Resource();
                 request.Topic.ResourceNamespace = _resourceNamespace;
                 request.Topic.Name = topic;
-                request.Endpoints = new Endpoints();
-                request.Endpoints.Scheme = global::Apache.Rocketmq.V1.AddressScheme.Ipv4;
-                var address = new global::Apache.Rocketmq.V1.Address();
-                int pos = nameServer.LastIndexOf(':');
-                address.Host = nameServer.Substring(0, pos);
-                address.Port = Int32.Parse(nameServer.Substring(pos + 1));
-                request.Endpoints.Addresses.Add(address);
-                var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+                request.Endpoints = AccessEndpoint(nameServer);
                 var metadata = new grpc.Metadata();
                 Signature.sign(this, metadata);
-                var topicRouteData = await Manager.ResolveRoute(target, metadata, request, getIoTimeout());
-                if (null != topicRouteData)
+                string target = $"https://{nameServer}";
+                TopicRouteData topicRouteData;
+                try
                 {
-                    if (retry > 0)
+                    topicRouteData = await Manager.ResolveRoute(target, metadata, request, getIoTimeout());
+                    if (null != topicRouteData)
                     {
-                        _currentNameServerIndex = index;
+                        Logger.Debug($"Got route entries for {topic} from name server");
+                        _topicRouteTable.TryAdd(topic, topicRouteData);
+
+                        if (retry > 0)
+                        {
+                            _currentNameServerIndex = index;
+                        }
+                        return topicRouteData;
                     }
-                    return topicRouteData;
+                    else
+                    {
+                        Logger.Warn($"Failed to query route of {topic} from {target}");
+                    }
+                }
+                catch (System.Exception e)
+                {
+                    Logger.Warn(e, "Failed when querying route");
                 }
             }
             return null;
         }
 
-        public abstract void PrepareHeartbeatData(HeartbeatRequest request);
+        public abstract void PrepareHeartbeatData(rmq::HeartbeatRequest request);
 
-        public void Heartbeat()
+        public async Task Heartbeat()
         {
-            List<string> endpoints = endpointsInUse();
+            List<string> endpoints = AvailableBrokerEndpoints();
             if (0 == endpoints.Count)
             {
+                Logger.Debug("No broker endpoints available in topic route");
                 return;
             }
 
-            var heartbeatRequest = new HeartbeatRequest();
-            PrepareHeartbeatData(heartbeatRequest);
+            var request = new rmq::HeartbeatRequest();
+            PrepareHeartbeatData(request);
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+
+            List<Task> tasks = new List<Task>();
+            foreach (var endpoint in endpoints)
+            {
+                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, getIoTimeout()));
+            }
+
+            await Task.WhenAll(tasks);
+        }
+
+        private List<string> BlockedBrokerEndpoints()
+        {
+            List<string> endpoints = new List<string>();
+            return endpoints;
+        }
+
+        public async Task HealthCheck()
+        {
+            var request = new rmq::HealthCheckRequest();
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            List<Task<Boolean>> tasks = new List<Task<Boolean>>();
+            List<string> endpoints = BlockedBrokerEndpoints();
+            foreach (var endpoint in endpoints)
+            {
+                tasks.Add(Manager.HealthCheck(endpoint, metadata, request, getIoTimeout()));
+            }
+            var result = await Task.WhenAll(tasks);
+            int i = 0;
+            foreach (var ok in result)
+            {
+                if (ok)
+                {
+                    RemoveFromBlockList(endpoints[i]);
+                }
+                ++i;
+            }
+        }
+
+        private void RemoveFromBlockList(string endpoint)
+        {
+
+        }
+
+        public async Task<List<rmq::Assignment>> scanLoadAssignment(string topic, string group)
+        {
+            // Pick a broker randomly
+            string target = FilterBroker((s) => true);
+            var request = new rmq::QueryAssignmentRequest();
+            request.ClientId = clientId();
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+            request.Endpoints = AccessEndpoint(_nameServers[_currentNameServerIndex]);
+            try
+            {
+                var metadata = new grpc::Metadata();
+                Signature.sign(this, metadata);
+                return await Manager.QueryLoadAssignment(target, metadata, request, getIoTimeout());
+            }
+            catch (System.Exception e)
+            {
+                Logger.Warn(e, $"Failed to acquire load assignments from {target}");
+            }
+            // Just return an empty list.
+            return new List<rmq.Assignment>();
+        }
+
+        private string TargetUrl(rmq::Assignment assignment)
+        {
+            var broker = assignment.Partition.Broker;
+            var addresses = broker.Endpoints.Addresses;
+            // TODO: use the first address for now. 
+            var address = addresses[0];
+            return $"https://{address.Host}:{address.Port}";
+        }
+
+
+        public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
+        {
+            var targetUrl = TargetUrl(assignment);
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            var request = new rmq::ReceiveMessageRequest();
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+            request.Partition = assignment.Partition;
+            var messages = await Manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
+            return messages;
+        }
+
+        public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
+        {
+            var request = new rmq::AckMessageRequest();
+            request.ClientId = clientId();
+            request.ReceiptHandle = receiptHandle;
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+
+            request.MessageId = messageId;
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
+            return await Manager.Ack(target, metadata, request, getIoTimeout());
         }
 
-        public void HealthCheck()
+        public async Task<Boolean> Nack(string target, string group, string topic, string receiptHandle, String messageId)
         {
+            var request = new rmq::NackMessageRequest();
+            request.ClientId = clientId();
+            request.ReceiptHandle = receiptHandle;
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = _resourceNamespace;
+            request.Group.Name = group;
+
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = _resourceNamespace;
+            request.Topic.Name = topic;
+
+            request.MessageId = messageId;
 
+            var metadata = new grpc::Metadata();
+            Signature.sign(this, metadata);
+            return await Manager.Nack(target, metadata, request, getIoTimeout());
         }
 
         public async Task<bool> NotifyClientTermination()
         {
-            List<string> endpoints = endpointsInUse();
-            var request = new NotifyClientTerminationRequest();
+            List<string> endpoints = AvailableBrokerEndpoints();
+            var request = new rmq::NotifyClientTerminationRequest();
             request.ClientId = clientId();
 
             var metadata = new grpc.Metadata();
@@ -263,12 +457,6 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
-        private List<string> endpointsInUse()
-        {
-            //TODO: gather endpoints from route entries.
-            return new List<string>();
-        }
-
         protected readonly IClientManager Manager;
         
         private readonly INameServerResolver _nameServerResolver;
@@ -279,6 +467,8 @@ namespace Org.Apache.Rocketmq
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
         private readonly CancellationTokenSource _updateTopicRouteCts;
 
+        private readonly CancellationTokenSource _healthCheckCts;
+
         protected const int MaxTransparentRetry = 3;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 54052ce..b708c32 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -25,7 +25,7 @@ namespace Org.Apache.Rocketmq {
             var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
             this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
             this._ioTimeout = TimeSpan.FromSeconds(3);
-            this.longPollingIoTimeout_ = TimeSpan.FromSeconds(15);
+            this.longPollingIoTimeout_ = TimeSpan.FromSeconds(30);
         }
 
         public string region() {
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 43af961..8640d93 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -17,15 +17,21 @@
 
 using rmq = Apache.Rocketmq.V1;
 using System;
+using System.IO;
+using System.IO.Compression;
 using System.Threading;
 using System.Threading.Tasks;
 using grpc = Grpc.Core;
 using System.Collections.Generic;
+using System.Security.Cryptography;
+using NLog;
 
 namespace Org.Apache.Rocketmq
 {
     public class ClientManager : IClientManager
     {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
         public ClientManager()
         {
             _rpcClients = new Dictionary<string, RpcClient>();
@@ -150,12 +156,23 @@ namespace Org.Apache.Rocketmq
         {
             var rpcClient = GetRpcClient(target);
             var response = await rpcClient.Heartbeat(metadata, request, timeout);
-            if (null == response)
+            Logger.Debug($"Heartbeat to {target} response status: {response.Common.Status.ToString()}");
+            return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+        }
+
+        public async Task<Boolean> HealthCheck(string target, grpc::Metadata metadata, rmq::HealthCheckRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            try
+            {
+                var response = await rpcClient.HealthCheck(metadata, request, timeout);
+                return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+            }
+            catch (System.Exception e)
             {
+                Logger.Debug(e, $"Health-check to {target} failed");
                 return false;
             }
-
-            return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
         }
 
         public async Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata,
@@ -175,6 +192,121 @@ namespace Org.Apache.Rocketmq
             return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
         }
 
+        public async Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            rmq::QueryAssignmentResponse response = await rpcClient.QueryAssignment(metadata, request, timeout);
+            if (response.Common.Status.Code != (int)Google.Rpc.Code.Ok)
+            {
+                // TODO: Build exception hierarchy
+                throw new Exception($"Failed to query load assignment from server. Cause: {response.Common.Status.Message}");
+            }
+
+            List<rmq::Assignment> assignments = new List<rmq.Assignment>();
+            foreach (var item in response.Assignments)
+            {
+                assignments.Add(item);
+            }
+            return assignments;
+        }
+
+        public async Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, rmq::ReceiveMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            rmq::ReceiveMessageResponse response = await rpcClient.ReceiveMessage(metadata, request, timeout);
+            if (response.Common.Status.Code != (int)Google.Rpc.Code.Ok)
+            {
+                throw new Exception($"Failed to receive messages from {target}, cause: {response.Common.Status.Message}");
+            }
+
+            List<Message> messages = new List<Message>();
+            foreach (var message in response.Messages)
+            {
+                messages.Add(convert(target, message));
+            }
+            return messages;
+        }
+
+        private Message convert(string sourceHost, rmq::Message message)
+        {
+            var msg = new Message();
+            msg.Topic = message.Topic.Name;
+            msg.messageId = message.SystemAttribute.MessageId;
+            msg.Tag = message.SystemAttribute.Tag;
+
+            // Validate message body checksum
+            byte[] raw = message.Body.ToByteArray();
+            if (rmq::DigestType.Crc32 == message.SystemAttribute.BodyDigest.Type)
+            {
+                uint checksum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length);
+                if (!message.SystemAttribute.BodyDigest.Checksum.Equals(checksum.ToString("X")))
+                {
+                    msg._bodyChecksumVerified = false;
+                }
+            }
+            else if (rmq::DigestType.Md5 == message.SystemAttribute.BodyDigest.Type)
+            {
+                var checksum = MD5.HashData(raw);
+                if (!message.SystemAttribute.BodyDigest.Checksum.Equals(Convert.ToHexString(checksum)))
+                {
+                    msg._bodyChecksumVerified = false;
+                }
+            }
+            else if (rmq::DigestType.Sha1 == message.SystemAttribute.BodyDigest.Type)
+            {
+                var checksum = SHA1.HashData(raw);
+                if (!message.SystemAttribute.BodyDigest.Checksum.Equals(Convert.ToHexString(checksum)))
+                {
+                    msg._bodyChecksumVerified = false;
+                }
+            }
+
+            foreach (var entry in message.UserAttribute)
+            {
+                msg.UserProperties.Add(entry.Key, entry.Value);
+            }
+
+            msg._receiptHandle = message.SystemAttribute.ReceiptHandle;
+            msg._sourceHost = sourceHost;
+
+            foreach (var key in message.SystemAttribute.Keys)
+            {
+                msg.Keys.Add(key);
+            }
+
+            msg._deliveryAttempt = message.SystemAttribute.DeliveryAttempt;
+
+            if (message.SystemAttribute.BodyEncoding == rmq::Encoding.Gzip)
+            {
+                // Decompress/Inflate message body
+                var inputStream = new MemoryStream(message.Body.ToByteArray());
+                var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress);
+                var outputStream = new MemoryStream();
+                gzipStream.CopyTo(outputStream);
+                msg.Body = outputStream.ToArray();
+            }
+            else
+            {
+                msg.Body = message.Body.ToByteArray();
+            }
+
+            return msg;
+        }
+
+        public async Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.AckMessage(metadata, request, timeout);
+            return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+        }
+
+        public async Task<Boolean> Nack(string target, grpc::Metadata metadata, rmq::NackMessageRequest request, TimeSpan timeout)
+        {
+            var rpcClient = GetRpcClient(target);
+            var response = await rpcClient.NackMessage(metadata, request, timeout);
+            return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+        }
+
         public async Task Shutdown()
         {
             _clientLock.EnterReadLock();
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/ExpressionType.cs
similarity index 82%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/ExpressionType.cs
index 7749c07..0caaf8e 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/ExpressionType.cs
@@ -14,19 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-using System.Threading.Tasks;
-
 namespace Org.Apache.Rocketmq
 {
-    public interface IClient : IClientConfig
-    {
-
-        void Heartbeat();
-
-        void HealthCheck();
-
-        Task<bool> NotifyClientTermination();
 
+    public enum ExpressionType
+    {
+        TAG,
+        SQL92,
     }
+
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/FilterExpression.cs
similarity index 65%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/FilterExpression.cs
index 7749c07..0dc0b92 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/FilterExpression.cs
@@ -14,19 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-using System.Threading.Tasks;
-
 namespace Org.Apache.Rocketmq
 {
-    public interface IClient : IClientConfig
+    public class FilterExpression
     {
 
-        void Heartbeat();
+        public FilterExpression(string expression, ExpressionType type)
+        {
+            _expression = expression;
+            _type = type;
+        }
+
+        private ExpressionType _type;
+
+        private string _expression;
 
-        void HealthCheck();
+        public ExpressionType Type
+        {
+            get { return _type; }
+        }
 
-        Task<bool> NotifyClientTermination();
+        public string Expression
+        {
+            get { return _expression; }
+        }
 
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 7749c07..209871f 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -16,15 +16,16 @@
  */
 
 using System.Threading.Tasks;
+using System;
 
 namespace Org.Apache.Rocketmq
 {
     public interface IClient : IClientConfig
     {
 
-        void Heartbeat();
+        Task Heartbeat();
 
-        void HealthCheck();
+        Task HealthCheck();
 
         Task<bool> NotifyClientTermination();
 
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index 3102f60..f5538f1 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -18,7 +18,10 @@
 using Apache.Rocketmq.V1;
 using System.Threading.Tasks;
 using System;
+using System.Collections.Generic;
 using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V1;
+
 
 namespace Org.Apache.Rocketmq {
     public interface IClientManager {
@@ -26,12 +29,23 @@ namespace Org.Apache.Rocketmq {
 
         Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
 
-        Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+        Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout);
+
+        Task<Boolean> HealthCheck(string target, grpc::Metadata metadata, rmq::HealthCheckRequest request, TimeSpan timeout);
 
         Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout);
 
         Task<SendMessageResponse> SendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
 
+        Task<List<Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, QueryAssignmentRequest request, TimeSpan timeout);
+
+        Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, ReceiveMessageRequest request, TimeSpan timeout);
+
+
+        Task<Boolean> Ack(string target, grpc::Metadata metadata, AckMessageRequest request, TimeSpan timeout);
+
+        Task<Boolean> Nack(string target, grpc::Metadata metadata, NackMessageRequest request, TimeSpan timeout);
+
         Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IConsumer.cs
similarity index 82%
copy from rocketmq-client-csharp/IProducer.cs
copy to rocketmq-client-csharp/IConsumer.cs
index 43a7815..ac4d787 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -14,15 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq {
-    public interface IProducer {
+namespace Org.Apache.Rocketmq
+{
+    public interface IConsumer
+    {
         void Start();
 
-        Task Shutdown();
-
-        Task<SendResult> Send(Message message);
+        void Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IMessageListener.cs
similarity index 85%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/IMessageListener.cs
index 7749c07..f46efd5 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IMessageListener.cs
@@ -15,18 +15,16 @@
  * limitations under the License.
  */
 
+using System.Collections.Generic;
 using System.Threading.Tasks;
 
 namespace Org.Apache.Rocketmq
 {
-    public interface IClient : IClientConfig
-    {
-
-        void Heartbeat();
-
-        void HealthCheck();
 
-        Task<bool> NotifyClientTermination();
+    public interface IMessageListener
+    {
+        Task Consume(List<Message> messages, List<Message> failed);
 
     }
+
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 43a7815..8819c4f 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -21,7 +21,7 @@ namespace Org.Apache.Rocketmq {
     public interface IProducer {
         void Start();
 
-        Task Shutdown();
+        void Shutdown();
 
         Task<SendResult> Send(Message message);
     }
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 66fd47d..5cbf1aa 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -39,12 +39,23 @@ namespace Org.Apache.Rocketmq
             this.systemProperties = new Dictionary<string, string>();
         }
 
-        private string messageId;
+        internal string messageId;
         public string MessageId
         {
             get { return messageId; }
         }
 
+        internal string _receiptHandle;
+        internal string _sourceHost;
+
+        internal int _deliveryAttempt;
+        public int DeliveryAttempt
+        {
+            get { return _deliveryAttempt; }
+        }
+
+        internal bool _bodyChecksumVerified = true;
+
         private string topic;
 
         public string Topic {
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/ProcessQueue.cs
similarity index 61%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/ProcessQueue.cs
index 7749c07..1022978 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/ProcessQueue.cs
@@ -14,19 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-using System.Threading.Tasks;
-
+using System;
 namespace Org.Apache.Rocketmq
 {
-    public interface IClient : IClientConfig
+    public class ProcessQueue
     {
 
-        void Heartbeat();
+        public ProcessQueue()
+        {
+            _lastReceivedTime = DateTime.UtcNow;
+        }
+        public bool Dropped { get; set; }
+
+        private DateTime _lastReceivedTime;
 
-        void HealthCheck();
+        public DateTime LastReceiveTime
+        {
+            get { return _lastReceivedTime; }
+            set { _lastReceivedTime = value; }
+        }
 
-        Task<bool> NotifyClientTermination();
+        internal bool Expired()
+        {
+            return DateTime.UtcNow.Subtract(_lastReceivedTime).TotalMilliseconds > 30 * 1000;
+        }
 
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 388a4a6..e2e023d 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -39,10 +39,10 @@ namespace Org.Apache.Rocketmq
             // More initalization
         }
 
-        public override async Task Shutdown()
+        public override void Shutdown()
         {
             // Release local resources
-            await base.Shutdown();
+            base.Shutdown();
         }
 
         public override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
new file mode 100644
index 0000000..382dbfb
--- /dev/null
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.Rocketmq.V1;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V1;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+    public class PushConsumer : Client, IConsumer
+    {
+        public PushConsumer(INameServerResolver resolver, string resourceNamespace, string group) : base(resolver, resourceNamespace)
+        {
+            _group = group;
+            _topicFilterExpressionMap = new ConcurrentDictionary<string, FilterExpression>();
+            _topicAssignmentsMap = new ConcurrentDictionary<string, List<rmq::Assignment>>();
+            _processQueueMap = new ConcurrentDictionary<Assignment, ProcessQueue>();
+            _scanAssignmentCTS = new CancellationTokenSource();
+            _scanExpiredProcessQueueCTS = new CancellationTokenSource();
+        }
+
+        public override async void Start()
+        {
+            if (null == _messageListener)
+            {
+                throw new System.Exception("Bad configuration: message listener is required");
+            }
+
+            base.Start();
+
+            // Step-1: Resolve topic routes
+            List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
+            foreach (var item in _topicFilterExpressionMap)
+            {
+                queryRouteTasks.Add(GetRouteFor(item.Key, true));
+            }
+            Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
+
+            // Step-2: Send heartbeats to all involving brokers so that we may get immediate, valid assignments.
+            await Heartbeat();
+
+            // Step-3: Scan load assignments that are assigned to current client
+            schedule(async () =>
+            {
+                await scanLoadAssignments();
+            }, 10, _scanAssignmentCTS.Token);
+
+            schedule(() =>
+            {
+                ScanExpiredProcessQueue();
+            }, 10, _scanExpiredProcessQueueCTS.Token);
+        }
+
+        public override void Shutdown()
+        {
+            _scanAssignmentCTS.Cancel();
+            _scanExpiredProcessQueueCTS.Cancel();
+
+            // Shutdown resources of derived class
+            base.Shutdown();
+        }
+
+        private async Task scanLoadAssignments()
+        {
+            Logger.Debug("Start to scan load assignments from server");
+            List<Task<List<Assignment>>> tasks = new List<Task<List<Assignment>>>();
+            foreach (var item in _topicFilterExpressionMap)
+            {
+                tasks.Add(scanLoadAssignment(item.Key, _group));
+            }
+            var result = await Task.WhenAll(tasks);
+
+            foreach (var assignments in result)
+            {
+                if (assignments.Count == 0)
+                {
+                    continue;
+                }
+
+                checkAndUpdateAssignments(assignments);
+            }
+            Logger.Debug("Completed scanning load assignments");
+        }
+
+        private void ScanExpiredProcessQueue()
+        {
+            foreach (var item in _processQueueMap)
+            {
+                if (item.Value.Expired())
+                {
+                    Task.Run(async () =>
+                    {
+                        await ExecutePop0(item.Key);
+                    });
+                }
+            }
+        }
+
+        private void checkAndUpdateAssignments(List<Assignment> assignments)
+        {
+            if (assignments.Count == 0)
+            {
+                return;
+            }
+
+            string topic = assignments[0].Partition.Topic.Name;
+
+            // Compare to generate or cancel pop-cycles
+            List<Assignment> existing;
+            _topicAssignmentsMap.TryGetValue(topic, out existing);
+
+            foreach (var assignment in assignments)
+            {
+                if (null == existing || !existing.Contains(assignment))
+                {
+                    ExecutePop(assignment);
+                }
+            }
+
+            if (null != existing)
+            {
+                foreach (var assignment in existing)
+                {
+                    if (!assignments.Contains(assignment))
+                    {
+                        Logger.Info($"Stop receiving messages from {assignment.Partition.ToString()}");
+                        CancelPop(assignment);
+                    }
+                }
+            }
+
+        }
+
+        private void ExecutePop(Assignment assignment)
+        {
+            var processQueue = new ProcessQueue();
+            if (_processQueueMap.TryAdd(assignment, processQueue))
+            {
+                Task.Run(async () =>
+                {
+                    await ExecutePop0(assignment);
+                });
+            }
+        }
+
+        private async Task ExecutePop0(Assignment assignment)
+        {
+            Logger.Info($"Start to pop {assignment.Partition.ToString()}");
+            while (true)
+            {
+                try
+                {
+                    ProcessQueue processQueue;
+                    if (!_processQueueMap.TryGetValue(assignment, out processQueue))
+                    {
+                        break;
+                    }
+
+                    if (processQueue.Dropped)
+                    {
+                        break;
+                    }
+
+                    List<Message> messages = await base.ReceiveMessage(assignment, _group);
+                    processQueue.LastReceiveTime = System.DateTime.UtcNow;
+
+                    // TODO: cache message and dispatch them 
+
+                    List<Message> failed = new List<Message>();
+                    await _messageListener.Consume(messages, failed);
+
+                    foreach (var message in failed)
+                    {
+                        await base.Nack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+                    }
+
+                    foreach (var message in messages)
+                    {
+                        if (!failed.Contains(message))
+                        {
+                            bool success = await base.Ack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+                            if (!success)
+                            {
+                                //TODO: log error.
+                            }
+                        }
+                    }
+                }
+                catch (System.Exception e)
+                {
+                    // TODO: log exception raised.
+                }
+
+
+            }
+        }
+
+        private void CancelPop(Assignment assignment)
+        {
+            if (!_processQueueMap.ContainsKey(assignment))
+            {
+                return;
+            }
+
+            ProcessQueue processQueue;
+            if (_processQueueMap.Remove(assignment, out processQueue))
+            {
+                processQueue.Dropped = true;
+            }
+        }
+
+        public override void PrepareHeartbeatData(HeartbeatRequest request)
+        {
+            request.ClientId = clientId();
+            var consumerData = new ConsumerData();
+            consumerData.ConsumeType = ConsumeMessageType.Passive;
+            consumerData.ConsumeModel = ConsumeModel.Clustering;
+            consumerData.Group = new Resource();
+            consumerData.Group.ResourceNamespace = resourceNamespace();
+            consumerData.Group.Name = _group;
+
+            foreach (var item in _topicFilterExpressionMap)
+            {
+                var sub = new SubscriptionEntry();
+                sub.Topic = new Resource();
+                sub.Topic.ResourceNamespace = _resourceNamespace;
+                sub.Topic.Name = item.Key;
+
+                sub.Expression = new rmq::FilterExpression();
+                switch (item.Value.Type)
+                {
+                    case ExpressionType.TAG:
+                        sub.Expression.Type = rmq::FilterType.Tag;
+                        break;
+                    case ExpressionType.SQL92:
+                        sub.Expression.Type = rmq::FilterType.Sql;
+                        break;
+                }
+                sub.Expression.Expression = item.Value.Expression;
+                consumerData.Subscriptions.Add(sub);
+            }
+            request.ConsumerData = consumerData;
+        }
+
+        public void Subscribe(string topic, string expression, ExpressionType type)
+        {
+            var filterExpression = new FilterExpression(expression, type);
+            _topicFilterExpressionMap[topic] = filterExpression;
+
+        }
+
+        public void RegisterListener(IMessageListener listener)
+        {
+            if (null != listener)
+            {
+                _messageListener = listener;
+            }
+        }
+
+        private string _group;
+
+        private ConcurrentDictionary<string, FilterExpression> _topicFilterExpressionMap;
+        private IMessageListener _messageListener;
+
+        private CancellationTokenSource _scanAssignmentCTS;
+
+        private ConcurrentDictionary<string, List<rmq::Assignment>> _topicAssignmentsMap;
+
+        private ConcurrentDictionary<Assignment, ProcessQueue> _processQueueMap;
+
+        private CancellationTokenSource _scanExpiredProcessQueueCTS;
+
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 7129e9f..fda655c 100644
--- a/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -11,6 +11,7 @@
   </PropertyGroup>
 
   <ItemGroup>
+    <PackageReference Include="Crc32.NET" Version="1.2.0" />
     <PackageReference Include="Google.Protobuf" Version="3.19.4" />
     <PackageReference Include="Grpc.Net.Client" Version="2.42.0" />
     <PackageReference Include="Grpc.Tools" Version="2.43.0">
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index a2ef2c7..7dde545 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -46,7 +46,6 @@ namespace Org.Apache.Rocketmq
         public void testSendMessage()
         {
             var producer = new Producer(resolver, resourceNamespace);
-            producer.ResourceNamespace = resourceNamespace;
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             producer.Start();
@@ -55,7 +54,7 @@ namespace Org.Apache.Rocketmq
             var msg = new Message(topic, body);
             var sendResult = producer.Send(msg).GetAwaiter().GetResult();
             Assert.IsNotNull(sendResult);
-            producer.Shutdown().GetAwaiter().GetResult();
+            producer.Shutdown();
         }
 
         private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
diff --git a/tests/ProducerTest.cs b/tests/PushConsumerTest.cs
similarity index 53%
copy from tests/ProducerTest.cs
copy to tests/PushConsumerTest.cs
index a2ef2c7..5250bb8 100644
--- a/tests/ProducerTest.cs
+++ b/tests/PushConsumerTest.cs
@@ -17,12 +17,37 @@
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System.Collections.Generic;
 using System;
+using System.Threading.Tasks;
 
 namespace Org.Apache.Rocketmq
 {
 
+    public class TestMessageListener : IMessageListener
+    {
+        public async Task Consume(List<Message> messages, List<Message> failed)
+        {
+            foreach (var message in messages)
+            {
+                Console.WriteLine("");
+            }
+        }
+
+    }
+
+    public class CountableMessageListener : IMessageListener
+    {
+        public async Task Consume(List<Message> messages, List<Message> failed)
+        {
+            foreach (var message in messages)
+            {
+                Console.WriteLine("{}", message.MessageId);
+            }
+        }
+
+    }
+
     [TestClass]
-    public class ProducerTest
+    public class PushConsumerTest
     {
 
         [ClassInitialize]
@@ -41,23 +66,35 @@ namespace Org.Apache.Rocketmq
 
         }
 
+        [TestMethod]
+        public void testLifecycle()
+        {
+            var consumer = new PushConsumer(resolver, resourceNamespace, group);
+            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            consumer.Region = "cn-hangzhou-pre";
+            consumer.Subscribe(topic, "*", ExpressionType.TAG);
+            consumer.RegisterListener(new TestMessageListener());
+            consumer.Start();
 
+            consumer.Shutdown();
+        }
+
+
+        // [Ignore]
         [TestMethod]
-        public void testSendMessage()
+        public void testConsumeMessage()
         {
-            var producer = new Producer(resolver, resourceNamespace);
-            producer.ResourceNamespace = resourceNamespace;
-            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
-            producer.Region = "cn-hangzhou-pre";
-            producer.Start();
-            byte[] body = new byte[1024];
-            Array.Fill(body, (byte)'x');
-            var msg = new Message(topic, body);
-            var sendResult = producer.Send(msg).GetAwaiter().GetResult();
-            Assert.IsNotNull(sendResult);
-            producer.Shutdown().GetAwaiter().GetResult();
+            var consumer = new PushConsumer(resolver, resourceNamespace, group);
+            consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            consumer.Region = "cn-hangzhou-pre";
+            consumer.Subscribe(topic, "*", ExpressionType.TAG);
+            consumer.RegisterListener(new CountableMessageListener());
+            consumer.Start();
+            System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
+            consumer.Shutdown();
         }
 
+
         private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
 
         private static string topic = "cpp_sdk_standard";
@@ -69,6 +106,7 @@ namespace Org.Apache.Rocketmq
         private static ICredentialsProvider credentialsProvider;
         private static string host = "116.62.231.199";
         private static int port = 80;
+
     }
 
 }
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 40e2b64..c66318d 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
-using Grpc.Net.Client;
 using rmq = global::Apache.Rocketmq.V1;
 using grpc = global::Grpc.Core;
 using System;
 using pb = global::Google.Protobuf;
+using System.Collections.Generic;
+using System.Threading.Tasks;
 
 namespace Org.Apache.Rocketmq
 {
@@ -65,8 +64,20 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(clientConfig, metadata);
-            
-            var response = rpcClient.QueryRoute(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+
+            // Execute route query multiple times.
+            List<Task<rmq::QueryRouteResponse>> tasks = new List<Task<rmq.QueryRouteResponse>>();
+            for (int i = 0; i < 16; i++)
+            {
+                tasks.Add(rpcClient.QueryRoute(metadata, request, clientConfig.getIoTimeout()));
+            }
+
+            // Verify
+            var result = Task.WhenAll(tasks).GetAwaiter().GetResult();
+            foreach (var item in result)
+            {
+                Assert.AreEqual(0, item.Common.Status.Code);
+            }
         }
 
 
@@ -85,6 +96,7 @@ namespace Org.Apache.Rocketmq
             Signature.sign(clientConfig, metadata);
             
             var response = rpcClient.Heartbeat(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+            Assert.AreEqual("ok", response.Common.Status.Message);
         }
 
         [TestMethod]
@@ -114,6 +126,202 @@ namespace Org.Apache.Rocketmq
             var response = rpcClient.SendMessage(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
         }
 
+        [TestMethod]
+        public void testHealthCheck()
+        {
+            var request = new rmq::HealthCheckRequest();
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = resourceNamespace;
+            request.Group.Name = group;
+            request.ClientHost = "test";
+            var metadata = new grpc::Metadata();
+            Signature.sign(clientConfig, metadata);
+            var response = rpcClient.HealthCheck(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+            Assert.AreEqual("ok", response.Common.Status.Message);
+        }
+
+        [TestMethod]
+        public void HeartbeatAsConsumer()
+        {
+            var request = new rmq::HeartbeatRequest();
+            request.ClientId = clientId;
+            request.ConsumerData = new rmq::ConsumerData();
+            request.ConsumerData.Group = new rmq::Resource();
+            request.ConsumerData.Group.ResourceNamespace = resourceNamespace;
+            request.ConsumerData.Group.Name = group;
+
+            request.ConsumerData.ConsumeModel = rmq::ConsumeModel.Clustering;
+            request.ConsumerData.ConsumePolicy = rmq::ConsumePolicy.Resume;
+            request.ConsumerData.ConsumeType = rmq::ConsumeMessageType.Passive;
+
+            var subscription = new rmq::SubscriptionEntry();
+            subscription.Topic = new rmq::Resource();
+            subscription.Topic.ResourceNamespace = resourceNamespace;
+            subscription.Topic.Name = topic;
+            subscription.Expression = new rmq::FilterExpression();
+            subscription.Expression.Type = rmq::FilterType.Tag;
+            subscription.Expression.Expression = "*";
+            request.ConsumerData.Subscriptions.Add(subscription);
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(clientConfig, metadata);
+
+            var response = rpcClient.Heartbeat(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+            Assert.AreEqual("ok", response.Common.Status.Message);
+        }
+
+        private rmq::QueryAssignmentResponse queryAssignments()
+        {
+            HeartbeatAsConsumer();
+            var request = new rmq::QueryAssignmentRequest();
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            address.Host = host;
+            address.Port = port;
+            request.Endpoints.Addresses.Add(address);
+
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = resourceNamespace;
+            request.Group.Name = group;
+
+            request.ClientId = clientId;
+            request.Topic = new rmq::Resource();
+            request.Topic.ResourceNamespace = resourceNamespace;
+            request.Topic.Name = topic;
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(clientConfig, metadata);
+
+            var response = rpcClient.QueryAssignment(metadata, request, System.TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+            return response;
+        }
+
+        [TestMethod]
+        public void testQueryAssignment()
+        {
+            var response = queryAssignments();
+
+            Assert.AreEqual("ok", response.Common.Status.Message);
+            Assert.IsTrue(response.Assignments.Count > 0);
+        }
+
+        private rmq::ReceiveMessageResponse DoReceiveMessage()
+        {
+            var assignmentsResponse = queryAssignments();
+            Assert.IsTrue(assignmentsResponse.Assignments.Count > 0);
+            var assignment = assignmentsResponse.Assignments[0];
+
+            // Send some prior messages 
+            for (int i = 0; i < batchSize; i++)
+            {
+                testSendMessage();
+            }
+
+            var request = new rmq::ReceiveMessageRequest();
+            request.Group = new rmq::Resource();
+            request.Group.ResourceNamespace = resourceNamespace;
+            request.Group.Name = group;
+
+            request.ClientId = clientId;
+            request.Partition = assignment.Partition;
+
+            request.FilterExpression = new rmq::FilterExpression();
+            request.FilterExpression.Type = rmq::FilterType.Tag;
+            request.FilterExpression.Expression = "*";
+
+            request.ConsumePolicy = rmq::ConsumePolicy.Resume;
+            request.BatchSize = batchSize;
+
+            request.InvisibleDuration = new Google.Protobuf.WellKnownTypes.Duration();
+            request.InvisibleDuration.Seconds = 10;
+            request.InvisibleDuration.Nanos = 0;
+
+            request.AwaitTime = new Google.Protobuf.WellKnownTypes.Duration();
+            request.AwaitTime.Seconds = 10;
+            request.AwaitTime.Nanos = 0;
+
+            var metadata = new grpc::Metadata();
+            Signature.sign(clientConfig, metadata);
+            var response = rpcClient.ReceiveMessage(metadata, request, TimeSpan.FromSeconds(15)).GetAwaiter().GetResult();
+            return response;
+        }
+
+        [TestMethod]
+        public void testReceiveMessage()
+        {
+            var response = DoReceiveMessage();
+            Assert.AreEqual(0, response.Common.Status.Code);
+            Assert.IsTrue(response.Messages.Count > 0);
+        }
+
+        [TestMethod]
+        public void testAck()
+        {
+            var receiveMessageResponse = DoReceiveMessage();
+
+            List<Task<rmq::AckMessageResponse>> tasks = new List<Task<rmq.AckMessageResponse>>();
+
+            foreach (var message in receiveMessageResponse.Messages)
+            {
+                var request = new rmq::AckMessageRequest();
+                request.Topic = new rmq::Resource();
+                request.Topic.ResourceNamespace = resourceNamespace;
+                request.Topic.Name = topic;
+
+                request.Group = new rmq::Resource();
+                request.Group.ResourceNamespace = resourceNamespace;
+                request.Group.Name = group;
+
+                request.ClientId = clientId;
+
+                request.ReceiptHandle = message.SystemAttribute.ReceiptHandle;
+                request.MessageId = message.SystemAttribute.MessageId;
+                var metadata = new grpc::Metadata();
+                Signature.sign(clientConfig, metadata);
+                tasks.Add(rpcClient.AckMessage(metadata, request, TimeSpan.FromSeconds(3)));
+            }
+            var result = Task.WhenAll(tasks).GetAwaiter().GetResult();
+            foreach (var item in result)
+            {
+                Assert.AreEqual("ok", item.Common.Status.Message);
+            }
+        }
+
+        [TestMethod]
+        public void testNack()
+        {
+            var receiveMessageResponse = DoReceiveMessage();
+            List<Task<rmq::NackMessageResponse>> tasks = new List<Task<rmq.NackMessageResponse>>();
+            foreach (var message in receiveMessageResponse.Messages)
+            {
+                var request = new rmq::NackMessageRequest();
+                request.Topic = new rmq::Resource();
+                request.Topic.ResourceNamespace = resourceNamespace;
+                request.Topic.Name = topic;
+
+                request.Group = new rmq::Resource();
+                request.Group.ResourceNamespace = resourceNamespace;
+                request.Group.Name = group;
+
+                request.ClientId = clientId;
+
+                request.ReceiptHandle = message.SystemAttribute.ReceiptHandle;
+                request.MessageId = message.SystemAttribute.MessageId;
+                request.DeliveryAttempt = 1;
+                request.MaxDeliveryAttempts = 16;
+
+                var metadata = new grpc::Metadata();
+                Signature.sign(clientConfig, metadata);
+                tasks.Add(rpcClient.NackMessage(metadata, request, TimeSpan.FromSeconds(3)));
+            }
+            var result = Task.WhenAll(tasks).GetAwaiter().GetResult();
+            foreach (var item in result)
+            {
+                Assert.AreEqual(0, item.Common.Status.Code);
+            }
+        }
+
         // Remove the Ignore annotation if server has fixed
         [Ignore]
         [TestMethod]
@@ -141,7 +349,10 @@ namespace Org.Apache.Rocketmq
         private static string host = "116.62.231.199";
         private static int port = 80;
 
+        private static int batchSize = 32;
+
         private static IRpcClient rpcClient;
+
         private static ClientConfig clientConfig;
     }
 }
\ No newline at end of file
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index acbad60..e622616 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -4,6 +4,9 @@ using Grpc.Net.Client;
 using Apache.Rocketmq.V1;
 
 using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
 namespace tests
 {
     [TestClass]
@@ -45,5 +48,26 @@ namespace tests
         public void TestRpcClientImplCtor() {
             RpcClient impl = new RpcClient("https://localhost:5001");
         }
+
+        [TestMethod]
+        public void TestConcurrentDictionary()
+        {
+            var dict = new ConcurrentDictionary<string, List<String>>();
+            string s = "abc";
+            List<String> result;
+            var exists = dict.TryGetValue(s, out result);
+            Assert.IsFalse(exists);
+            Assert.IsNull(result);
+
+            result = new List<string>();
+            result.Add("abc");
+            Assert.IsTrue(dict.TryAdd(s, result));
+
+            List<String> list;
+            exists = dict.TryGetValue(s, out list);
+            Assert.IsTrue(exists);
+            Assert.IsNotNull(list);
+            Assert.AreEqual(1, list.Count);
+        }
     }
 }