You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:47 UTC

[rocketmq-clients] 07/28: Implement simple consumer

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 26bb67c51c279013fabd32e4eeb7809aaf9e4f56
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 13 17:13:57 2023 +0800

    Implement simple consumer
---
 csharp/examples/ProducerDelayMessageExample.cs     |  77 +++++----
 csharp/examples/ProducerNormalMessageExample.cs    |   6 +-
 csharp/examples/SimpleConsumerExample.cs           |  68 ++++----
 csharp/rocketmq-client-csharp/Address.cs           |  25 ++-
 csharp/rocketmq-client-csharp/AddressScheme.cs     |  10 +-
 csharp/rocketmq-client-csharp/Broker.cs            |  10 ++
 csharp/rocketmq-client-csharp/Client.cs            |  25 +--
 csharp/rocketmq-client-csharp/ClientManager.cs     |  33 ++--
 csharp/rocketmq-client-csharp/Consumer.cs          |  99 +++++++++++
 .../rocketmq-client-csharp/Error/StatusChecker.cs  |  41 -----
 csharp/rocketmq-client-csharp/FilterExpression.cs  |   6 +
 csharp/rocketmq-client-csharp/IClientManager.cs    |  33 ++++
 csharp/rocketmq-client-csharp/MessageQueue.cs      |  27 ++-
 csharp/rocketmq-client-csharp/MessageView.cs       |  44 ++---
 csharp/rocketmq-client-csharp/MqEncoding.cs        |  10 +-
 csharp/rocketmq-client-csharp/Permission.cs        |  22 ++-
 csharp/rocketmq-client-csharp/Producer.cs          |   8 +-
 csharp/rocketmq-client-csharp/Publishing.cs        |   4 +-
 .../PublishingLoadBalancer.cs                      |  33 ++--
 .../rocketmq-client-csharp/PublishingSettings.cs   |  17 +-
 .../{IProducer.cs => ReceiveMessageResult.cs}      |  14 +-
 csharp/rocketmq-client-csharp/Resource.cs          |   8 +-
 csharp/rocketmq-client-csharp/RpcClient.cs         |  42 ++---
 csharp/rocketmq-client-csharp/Settings.cs          |  10 +-
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    | 192 +++++++++++++++++++++
 .../SimpleSubscriptionSettings.cs                  |  95 ++++++++++
 csharp/rocketmq-client-csharp/StatusChecker.cs     |   1 +
 .../SubscriptionLoadBalancer.cs                    |  38 ++--
 csharp/rocketmq-client-csharp/TopicRouteData.cs    |  10 +-
 csharp/rocketmq-client-csharp/Utilities.cs         |   5 +-
 csharp/tests/UnitTest1.cs                          |  51 +++---
 31 files changed, 748 insertions(+), 316 deletions(-)

diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs
index 27e32e76..9ad5fbb9 100644
--- a/csharp/examples/ProducerDelayMessageExample.cs
+++ b/csharp/examples/ProducerDelayMessageExample.cs
@@ -24,49 +24,50 @@ using Org.Apache.Rocketmq;
 
 namespace examples
 {
-    static class ProducerDelayMessageExample
+    internal static class ProducerDelayMessageExample
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
         internal static async Task QuickStart()
         {
-            // string accessKey = "yourAccessKey";
-            // string secretKey = "yourSecretKey";
-            // // Credential provider is optional for client configuration.
-            // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
-            // string endpoints = "foobar.com:8080";
-            // // In most case, you don't need to create too many producers, single pattern is recommended.
-            // var producer = new Producer(endpoints)
-            // {
-            //     CredentialsProvider = credentialsProvider
-            // };
-            // string topic = "yourDelayTopic";
-            // // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
-            // // the topic route before message publishing.
-            // producer.AddTopicOfInterest(topic);
-            //
-            // await producer.Start();
-            // // Define your message body.
-            // var bytes = Encoding.UTF8.GetBytes("foobar");
-            // string tag = "yourMessageTagA";
-            // // You could set multiple keys for the single message.
-            // var keys = new List<string>
-            // {
-            //     "yourMessageKey-2f00df144e48",
-            //     "yourMessageKey-49df1dd332b7"
-            // };
-            // // Set topic for current message.
-            // var message = new Message(topic, bytes)
-            // {
-            //     Tag = tag,
-            //     Keys = keys,
-            //     // Essential for DELAY message.
-            //     DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30)
-            // };
-            // var sendReceipt = await producer.Send(message);
-            // Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
-            // // Close the producer if you don't need it anymore.
-            // await producer.Shutdown();
+            const string accessKey = "yourAccessKey";
+            const string secretKey = "yourSecretKey";
+            // Credential provider is optional for client configuration.
+            var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
+            const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+            var clientConfig = new ClientConfig(endpoints)
+            {
+                CredentialsProvider = credentialsProvider
+            };
+            // In most case, you don't need to create too many producers, single pattern is recommended.
+            var producer = new Producer(clientConfig);
+            const string topic = "yourDelayTopic";
+            // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
+            // the topic route before message publishing.
+            producer.SetTopics(topic);
+
+            await producer.Start();
+            // Define your message body.
+            var bytes = Encoding.UTF8.GetBytes("foobar");
+            const string tag = "yourMessageTagA";
+            // You could set multiple keys for the single message.
+            var keys = new List<string>
+            {
+                "yourMessageKey-2f00df144e48",
+                "yourMessageKey-49df1dd332b7"
+            };
+            // Set topic for current message.
+            var message = new Message(topic, bytes)
+            {
+                Tag = tag,
+                Keys = keys,
+                // Essential for DELAY message.
+                DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30)
+            };
+            var sendReceipt = await producer.Send(message);
+            Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
+            // Close the producer if you don't need it anymore.
+            await producer.Shutdown();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index 16791a13..3274ade2 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -32,6 +32,7 @@ namespace examples
         {
             const string accessKey = "5jFk0wK7OU6Uq395";
             const string secretKey = "V1u8z19URHs4o6RQ";
+
             // Credential provider is optional for client configuration.
             var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
             const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
@@ -41,12 +42,11 @@ namespace examples
             };
             // In most case, you don't need to create too many producers, single pattern is recommended.
             var producer = new Producer(clientConfig);
-            
+
             const string topic = "lingchu_normal_topic";
             producer.SetTopics(topic);
             // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
             // the topic route before message publishing.
-
             await producer.Start();
             // Define your message body.
             var bytes = Encoding.UTF8.GetBytes("foobar");
@@ -67,7 +67,7 @@ namespace examples
             Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
             Thread.Sleep(9999999);
             // Close the producer if you don't need it anymore.
-            // await producer.Shutdown();
+            await producer.Shutdown();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/examples/SimpleConsumerExample.cs b/csharp/examples/SimpleConsumerExample.cs
index 80299fbb..b41125c8 100644
--- a/csharp/examples/SimpleConsumerExample.cs
+++ b/csharp/examples/SimpleConsumerExample.cs
@@ -23,47 +23,43 @@ using Org.Apache.Rocketmq;
 
 namespace examples
 {
-    static class SimpleConsumerExample
+    internal static class SimpleConsumerExample
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
         internal static async Task QuickStart()
         {
-            // string accessKey = "yourAccessKey";
-            // string secretKey = "yourSecretKey";
-            // // Credential provider is optional for client configuration.
-            // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
-            // string endpoints = "foobar.com:8080";
-            //
-            // string consumerGroup = "yourConsumerGroup";
-            // SimpleConsumer simpleConsumer = new SimpleConsumer(endpoints, consumerGroup)
-            // {
-            //     CredentialsProvider = credentialsProvider
-            // };
-            //
-            // string topic = "yourTopic";
-            // string tag = "tagA";
-            // // Set topic subscription for consumer.
-            // simpleConsumer.Subscribe(topic, new FilterExpression(tag, ExpressionType.Tag));
-            // await simpleConsumer.Start();
-            //
-            // int maxMessageNum = 16;
-            // TimeSpan invisibleDuration = TimeSpan.FromSeconds(15);
-            // var messages = await simpleConsumer.Receive(maxMessageNum, invisibleDuration);
-            // Logger.Info($"{messages.Count} messages has been received.");
-            //
-            // var tasks = new List<Task>();
-            // foreach (var message in messages)
-            // {
-            //     Logger.Info($"Received a message, topic={message.Topic}, message-id={message.MessageId}.");
-            //     var task = simpleConsumer.Ack(message);
-            //     tasks.Add(task);
-            // }
-            //
-            // await Task.WhenAll(tasks);
-            // Logger.Info($"{tasks.Count} messages have been acknowledged.");
-            // // Close the consumer if you don't need it anymore.
-            // await simpleConsumer.Shutdown();
+            const string accessKey = "yourAccessKey";
+            const string secretKey = "yourSecretKey";
+
+            // Credential provider is optional for client configuration.
+            var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
+            const string endpoints = "foobar.com:8080";
+            var clientConfig = new ClientConfig(endpoints)
+            {
+                CredentialsProvider = credentialsProvider
+            };
+            // Add your subscriptions.
+            const string consumerGroup = "yourConsumerGroup";
+            var subscription = new Dictionary<string, FilterExpression>
+                { { consumerGroup, new FilterExpression("*") } };
+            // In most case, you don't need to create too many consumers, single pattern is recommended.
+            var simpleConsumer =
+                new SimpleConsumer(clientConfig, consumerGroup, TimeSpan.FromSeconds(15), subscription);
+
+            await simpleConsumer.Start();
+            var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+            foreach (var message in messageViews)
+            {
+                Logger.Info($"Received a message, topic={message.Topic}, message-id={message.MessageId}");
+                await simpleConsumer.Ack(message);
+                Logger.Info($"Message is acknowledged successfully, message-id={message.MessageId}");
+                // await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(15));
+                // Logger.Info($"Changing message invisible duration successfully, message=id={message.MessageId}");
+            }
+
+            // Close the consumer if you don't need it anymore.
+            await simpleConsumer.Shutdown();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Address.cs b/csharp/rocketmq-client-csharp/Address.cs
index fca83530..a66a6369 100644
--- a/csharp/rocketmq-client-csharp/Address.cs
+++ b/csharp/rocketmq-client-csharp/Address.cs
@@ -1,11 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 using System;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
     public class Address : IEquatable<Address>
     {
-        public Address(rmq.Address address)
+        public Address(Proto.Address address)
         {
             Host = address.Host;
             Port = address.Port;
@@ -21,9 +38,9 @@ namespace Org.Apache.Rocketmq
 
         public int Port { get; }
 
-        public rmq.Address ToProtobuf()
+        public Proto.Address ToProtobuf()
         {
-            return new rmq.Address
+            return new Proto.Address
             {
                 Host = Host,
                 Port = Port
diff --git a/csharp/rocketmq-client-csharp/AddressScheme.cs b/csharp/rocketmq-client-csharp/AddressScheme.cs
index 6f36f546..06a5ea32 100644
--- a/csharp/rocketmq-client-csharp/AddressScheme.cs
+++ b/csharp/rocketmq-client-csharp/AddressScheme.cs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
@@ -28,17 +28,17 @@ namespace Org.Apache.Rocketmq
 
     public static class AddressSchemeHelper
     {
-        public static rmq.AddressScheme ToProtobuf(AddressScheme scheme)
+        public static Proto.AddressScheme ToProtobuf(AddressScheme scheme)
         {
             switch (scheme)
             {
                 case AddressScheme.Ipv4:
-                    return rmq.AddressScheme.Ipv4;
+                    return Proto.AddressScheme.Ipv4;
                 case AddressScheme.Ipv6:
-                    return rmq.AddressScheme.Ipv6;
+                    return Proto.AddressScheme.Ipv6;
                 case AddressScheme.DomainName:
                 default:
-                    return rmq.AddressScheme.DomainName;
+                    return Proto.AddressScheme.DomainName;
             }
         }
     }
diff --git a/csharp/rocketmq-client-csharp/Broker.cs b/csharp/rocketmq-client-csharp/Broker.cs
index 6b426a5a..6a2f957a 100644
--- a/csharp/rocketmq-client-csharp/Broker.cs
+++ b/csharp/rocketmq-client-csharp/Broker.cs
@@ -31,5 +31,15 @@ namespace Org.Apache.Rocketmq
         public string Name { get; }
         public int Id { get; }
         public Endpoints Endpoints { get; }
+
+        public Proto.Broker ToProtobuf()
+        {
+            return new Proto.Broker()
+            {
+                Name = Name,
+                Id = Id,
+                Endpoints = Endpoints.ToProtobuf()
+            };
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 51ecb936..3b260022 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -44,8 +44,8 @@ namespace Org.Apache.Rocketmq
         protected readonly IClientManager ClientManager;
         protected readonly string ClientId;
 
-        protected readonly ConcurrentDictionary<string, bool> Topics;
-        
+        protected readonly ICollection<string> Topics;
+
         protected readonly ConcurrentDictionary<Endpoints, bool> Isolated;
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache;
         private readonly CancellationTokenSource _telemetryCts;
@@ -53,7 +53,7 @@ namespace Org.Apache.Rocketmq
         private readonly Dictionary<Endpoints, Session> _sessionsTable;
         private readonly ReaderWriterLockSlim _sessionLock;
 
-        protected Client(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics)
+        protected Client(ClientConfig clientConfig, ICollection<string> topics)
         {
             ClientConfig = clientConfig;
             Topics = topics;
@@ -78,7 +78,7 @@ namespace Org.Apache.Rocketmq
             ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, _topicRouteUpdateCtx.Token);
             ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, _heartbeatCts.Token);
             ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, _settingsSyncCtx.Token);
-            foreach (var topic in Topics.Keys)
+            foreach (var topic in Topics)
             {
                 await FetchTopicRoute(topic);
             }
@@ -135,7 +135,7 @@ namespace Org.Apache.Rocketmq
         protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
 
 
-        protected abstract void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData);
+        protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData);
 
 
         private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
@@ -159,7 +159,7 @@ namespace Org.Apache.Rocketmq
             }
 
             _topicRouteCache[topic] = topicRouteData;
-            OnTopicRouteDataFetched0(topic, topicRouteData);
+            OnTopicRouteDataUpdated0(topic, topicRouteData);
         }
 
 
@@ -183,7 +183,7 @@ namespace Org.Apache.Rocketmq
         private async void UpdateTopicRouteCache()
         {
             Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}");
-            foreach (var topic in Topics.Keys)
+            foreach (var topic in Topics)
             {
                 var topicRouteData = await FetchTopicRoute(topic);
                 _topicRouteCache[topic] = topicRouteData;
@@ -250,6 +250,7 @@ namespace Org.Apache.Rocketmq
                 Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " +
                              $"statusMessage={response.Status.Message}");
             }
+
             StatusChecker.Check(response.Status, request);
 
             var messageQueues = response.MessageQueues.ToList();
@@ -266,13 +267,14 @@ namespace Org.Apache.Rocketmq
             }
 
             var request = WrapHeartbeatRequest();
-            Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new ();
+            Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new();
             // Collect task into a map.
             foreach (var item in endpoints)
             {
                 var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
-                responses[item]= task;
+                responses[item] = task;
             }
+
             foreach (var item in responses.Keys)
             {
                 var response = await responses[item];
@@ -291,11 +293,10 @@ namespace Org.Apache.Rocketmq
                 }
 
                 var statusMessage = response.Status.Message;
-                Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
+                Logger.Info(
+                    $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
             }
         }
-        
-        
 
 
         public grpc.Metadata Sign()
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index 3eef2fe6..967a2ac1 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 using System;
 using System.Threading;
 using System.Threading.Tasks;
@@ -78,7 +78,7 @@ namespace Org.Apache.Rocketmq
             _clientLock.EnterReadLock();
             try
             {
-                List<Task> tasks = new List<Task>();
+                var tasks = new List<Task>();
                 foreach (var item in _rpcClients)
                 {
                     tasks.Add(item.Value.Shutdown());
@@ -92,56 +92,57 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(
+        public grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> Telemetry(
             Endpoints endpoints)
         {
             return GetRpcClient(endpoints).Telemetry(_client.Sign());
         }
 
-        public async Task<rmq.QueryRouteResponse> QueryRoute(Endpoints endpoints, rmq.QueryRouteRequest request,
+        public async Task<Proto.QueryRouteResponse> QueryRoute(Endpoints endpoints, Proto.QueryRouteRequest request,
             TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).QueryRoute(_client.Sign(), request, timeout);
         }
 
-        public async Task<rmq.HeartbeatResponse> Heartbeat(Endpoints endpoints, rmq.HeartbeatRequest request,
+        public async Task<Proto.HeartbeatResponse> Heartbeat(Endpoints endpoints, Proto.HeartbeatRequest request,
             TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).Heartbeat(_client.Sign(), request, timeout);
         }
 
-        public async Task<rmq.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
-            rmq.NotifyClientTerminationRequest request, TimeSpan timeout)
+        public async Task<Proto.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
+            Proto.NotifyClientTerminationRequest request, TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).NotifyClientTermination(_client.Sign(), request, timeout);
         }
 
-        public async Task<rmq::SendMessageResponse> SendMessage(Endpoints endpoints, rmq::SendMessageRequest request,
+        public async Task<Proto::SendMessageResponse> SendMessage(Endpoints endpoints,
+            Proto::SendMessageRequest request,
             TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).SendMessage(_client.Sign(), request, timeout);
         }
 
-        public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints,
-            rmq.QueryAssignmentRequest request, TimeSpan timeout)
+        public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints,
+            Proto.QueryAssignmentRequest request, TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).QueryAssignment(_client.Sign(), request, timeout);
         }
 
-        public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints,
-            rmq.ReceiveMessageRequest request, TimeSpan timeout)
+        public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints,
+            Proto.ReceiveMessageRequest request, TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).ReceiveMessage(_client.Sign(), request, timeout);
         }
 
-        public async Task<rmq::AckMessageResponse> AckMessage(Endpoints endpoints,
-            rmq.AckMessageRequest request, TimeSpan timeout)
+        public async Task<Proto::AckMessageResponse> AckMessage(Endpoints endpoints,
+            Proto.AckMessageRequest request, TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).AckMessage(_client.Sign(), request, timeout);
         }
 
-        public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
-            rmq.ChangeInvisibleDurationRequest request, TimeSpan timeout)
+        public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
+            Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout)
         {
             return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout);
         }
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs
new file mode 100644
index 00000000..25df84d3
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public abstract class Consumer : Client
+    {
+        protected readonly string ConsumerGroup;
+
+        protected Consumer(ClientConfig clientConfig, string consumerGroup, ICollection<string> topics) : base(
+            clientConfig, topics)
+        {
+            ConsumerGroup = consumerGroup;
+        }
+        
+        protected async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
+            TimeSpan awaitDuration)
+        {
+            var tolerance = ClientConfig.RequestTimeout;
+            var timeout = tolerance.Add(awaitDuration);
+            var response = await ClientManager.ReceiveMessage(mq.Broker.Endpoints, request, timeout);
+            var status = new Proto.Status()
+            {
+                Code = Proto.Code.InternalServerError,
+                Message = "Status was not set by server"
+            };
+            var messageList = new List<Proto.Message>();
+            foreach (var entry in response)
+            {
+                switch (entry.ContentCase)
+                {
+                    case Proto.ReceiveMessageResponse.ContentOneofCase.Status:
+                        status = entry.Status;
+                        break;
+                    case Proto.ReceiveMessageResponse.ContentOneofCase.Message:
+                        messageList.Add(entry.Message);
+                        break;
+                    case Proto.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp:
+                    case Proto.ReceiveMessageResponse.ContentOneofCase.None:
+                    default:
+                        break;
+                }
+            }
+
+            var messages = messageList.Select(message => MessageView.FromProtobuf(message, mq)).ToList();
+            StatusChecker.Check(status, request);
+            return new ReceiveMessageResult(mq.Broker.Endpoints, messages);
+        }
+
+        private static Proto.FilterExpression WrapFilterExpression(FilterExpression filterExpression)
+        {
+            var filterType = Proto.FilterType.Tag;
+            if (ExpressionType.Sql92.Equals(filterExpression.Type))
+            {
+                filterType = Proto.FilterType.Sql;
+            }
+
+            return new Proto.FilterExpression
+            {
+                Type = filterType,
+                Expression = filterExpression.Expression
+            };
+        }
+
+        protected static Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
+            FilterExpression filterExpression, TimeSpan invisibleDuration)
+        {
+            return new Proto.ReceiveMessageRequest()
+            {
+                MessageQueue = mq.ToProtobuf(),
+                FilterExpression = WrapFilterExpression(filterExpression),
+                BatchSize = batchSize,
+                AutoRenew = false,
+                InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Error/StatusChecker.cs b/csharp/rocketmq-client-csharp/Error/StatusChecker.cs
deleted file mode 100644
index 55f56d57..00000000
--- a/csharp/rocketmq-client-csharp/Error/StatusChecker.cs
+++ /dev/null
@@ -1,41 +0,0 @@
-using Apache.Rocketmq.V2;
-using Google.Protobuf;
-
-namespace Org.Apache.Rocketmq.Error
-{
-    public class StatusChecker
-    {
-        public static void check(Status status, IMessage message)
-        {
-            // var code = status.Code;
-            // switch (code)
-            // {
-            //     case Code.Ok:
-            //     case Code.MultipleResults:
-            //         return;
-            //     case Code.BadRequest:
-            //     case Code.IllegalAccessPoint:
-            //     case Code.IllegalTopic:
-            //     case Code.IllegalConsumerGroup:
-            //     case Code.IllegalMessageTag:
-            //     case Code.IllegalMessageKey:
-            //     case Code.IllegalMessageGroup:
-            //     case Code.IllegalMessagePropertyKey:
-            //     case Code.InvalidTransactionId:
-            //     case Code.IllegalMessageId:
-            //     case Code.IllegalFilterExpression:
-            //     case Code.IllegalInvisibleTime:
-            //     case Code.IllegalDeliveryTime:
-            //     case Code.InvalidReceiptHandle:
-            //     case Code.MessagePropertyConflictWithType:
-            //     case Code.UnrecognizedClientType:
-            //     case Code.MessageCorrupted:
-            //     case Code.ClientIdRequired:
-            //     case Code.IllegalPollingTime:
-            //         throw new BadRequestException(code)
-            //
-            //     case ILLEGAL_POLLING_TIME:
-            // }
-        }
-    }
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/FilterExpression.cs b/csharp/rocketmq-client-csharp/FilterExpression.cs
index 3bd432da..07a4af5a 100644
--- a/csharp/rocketmq-client-csharp/FilterExpression.cs
+++ b/csharp/rocketmq-client-csharp/FilterExpression.cs
@@ -25,6 +25,12 @@ namespace Org.Apache.Rocketmq
             Type = type;
         }
 
+        public FilterExpression(string expression)
+        {
+            Expression = expression;
+            Type = ExpressionType.Tag;
+        }
+
         public ExpressionType Type { get; }
         public string Expression { get; }
     }
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index f2e48e36..df2035ab 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -25,15 +25,48 @@ namespace Org.Apache.Rocketmq
 {
     public interface IClientManager
     {
+        /// <summary>
+        /// Establish a telemetry channel between client and remote endpoints.
+        /// </summary>
+        /// <param name="endpoints">The target endpoints.</param>
+        /// <returns>gRPC bi-directional stream.</returns>
         AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Endpoints endpoints);
 
+        /// <summary>
+        /// Query topic route info from remote.
+        /// </summary>
+        /// <param name="endpoints">The target endpoints.</param>
+        /// <param name="request">gRPC request of querying topic route.</param>
+        /// <param name="timeout">Request max duration.</param>
+        /// <returns>Task of response.</returns>
         Task<QueryRouteResponse> QueryRoute(Endpoints endpoints, QueryRouteRequest request, TimeSpan timeout);
 
+        /// <summary>
+        /// Send heartbeat to remote endpoints.
+        /// </summary>
+        /// <param name="endpoints">The target endpoints.</param>
+        /// <param name="request">gRPC request of heartbeat.</param>
+        /// <param name="timeout">Request max duration.</param>
+        /// <returns>Task of response.</returns>
         Task<HeartbeatResponse> Heartbeat(Endpoints endpoints, HeartbeatRequest request, TimeSpan timeout);
 
+        /// <summary>
+        /// Notify client's termination.
+        /// </summary>
+        /// <param name="endpoints">The target endpoints.</param>
+        /// <param name="request">gRPC request of notifying client's termination.</param>
+        /// <param name="timeout">Request max duration.</param>
+        /// <returns>Task of response.</returns>
         Task<NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
             NotifyClientTerminationRequest request, TimeSpan timeout);
 
+        /// <summary>
+        /// Send message to remote endpoints.
+        /// </summary>
+        /// <param name="endpoints"></param>
+        /// <param name="request"></param>
+        /// <param name="timeout"></param>
+        /// <returns></returns>
         Task<SendMessageResponse> SendMessage(Endpoints endpoints, SendMessageRequest request,
             TimeSpan timeout);
 
diff --git a/csharp/rocketmq-client-csharp/MessageQueue.cs b/csharp/rocketmq-client-csharp/MessageQueue.cs
index cd6f0ce3..580fbafe 100644
--- a/csharp/rocketmq-client-csharp/MessageQueue.cs
+++ b/csharp/rocketmq-client-csharp/MessageQueue.cs
@@ -16,23 +16,20 @@
  */
 
 using System.Collections.Generic;
-using rmq = Apache.Rocketmq.V2;
+using System.Linq;
+using Proto = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
     public class MessageQueue
     {
-        public MessageQueue(rmq::MessageQueue messageQueue)
+        public MessageQueue(Proto::MessageQueue messageQueue)
         {
             TopicResource = new Resource(messageQueue.Topic);
             QueueId = messageQueue.Id;
             Permission = PermissionHelper.FromProtobuf(messageQueue.Permission);
-            var messageTypes = new List<MessageType>();
-            foreach (var acceptMessageType in messageQueue.AcceptMessageTypes)
-            {
-                var messageType = MessageTypeHelper.FromProtobuf(acceptMessageType);
-                messageTypes.Add(messageType);
-            }
+            var messageTypes = messageQueue.AcceptMessageTypes
+                .Select(MessageTypeHelper.FromProtobuf).ToList();
 
             AcceptMessageTypes = messageTypes;
             Broker = new Broker(messageQueue.Broker);
@@ -57,5 +54,19 @@ namespace Org.Apache.Rocketmq
         {
             return $"{Broker.Name}.{TopicResource}.{QueueId}";
         }
+
+        public Proto.MessageQueue ToProtobuf()
+        {
+            var messageTypes = AcceptMessageTypes.Select(MessageTypeHelper.ToProtobuf).ToList();
+
+            return new Proto.MessageQueue
+            {
+                Topic = TopicResource.ToProtobuf(),
+                Id = QueueId,
+                Permission = PermissionHelper.ToProtobuf(Permission),
+                Broker = Broker.ToProtobuf(),
+                AcceptMessageTypes = { messageTypes }
+            };
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index b790fcb9..dfb45e45 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Security.Cryptography;
 using NLog;
 
@@ -30,14 +31,14 @@ namespace Org.Apache.Rocketmq
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        private readonly rmq.MessageQueue _messageQueue;
-        private readonly string _receiptHandle;
+        internal readonly MessageQueue MessageQueue;
+        internal readonly string ReceiptHandle;
         private readonly long _offset;
         private readonly bool _corrupted;
 
         internal MessageView(string messageId, string topic, byte[] body, string tag, string messageGroup,
             DateTime deliveryTime, List<string> keys, Dictionary<string, string> properties, string bornHost,
-            DateTime bornTime, int deliveryAttempt, rmq.MessageQueue messageQueue, string receiptHandle, long offset,
+            DateTime bornTime, int deliveryAttempt, MessageQueue messageQueue, string receiptHandle, long offset,
             bool corrupted)
         {
             MessageId = messageId;
@@ -51,8 +52,8 @@ namespace Org.Apache.Rocketmq
             BornHost = bornHost;
             BornTime = bornTime;
             DeliveryAttempt = deliveryAttempt;
-            _messageQueue = messageQueue;
-            _receiptHandle = receiptHandle;
+            MessageQueue = messageQueue;
+            ReceiptHandle = receiptHandle;
             _offset = offset;
             _corrupted = corrupted;
         }
@@ -79,7 +80,7 @@ namespace Org.Apache.Rocketmq
 
         public int DeliveryAttempt { get; }
 
-        public static MessageView FromProtobuf(rmq.Message message, rmq.MessageQueue messageQueue)
+        public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue)
         {
             var topic = message.Topic.Name;
             var systemProperties = message.SystemProperties;
@@ -87,11 +88,11 @@ namespace Org.Apache.Rocketmq
             var bodyDigest = systemProperties.BodyDigest;
             var checkSum = bodyDigest.Checksum;
             var raw = message.Body.ToByteArray();
-            bool corrupted = false;
+            var corrupted = false;
             var type = bodyDigest.Type;
             switch (type)
             {
-                case rmq.DigestType.Crc32:
+                case Proto.DigestType.Crc32:
                 {
                     var expectedCheckSum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length).ToString("X");
                     if (!expectedCheckSum.Equals(checkSum))
@@ -101,7 +102,7 @@ namespace Org.Apache.Rocketmq
 
                     break;
                 }
-                case rmq.DigestType.Md5:
+                case Proto.DigestType.Md5:
                 {
                     var expectedCheckSum = Convert.ToHexString(MD5.HashData(raw));
                     if (!expectedCheckSum.Equals(checkSum))
@@ -111,7 +112,7 @@ namespace Org.Apache.Rocketmq
 
                     break;
                 }
-                case rmq.DigestType.Sha1:
+                case Proto.DigestType.Sha1:
                 {
                     var expectedCheckSum = Convert.ToHexString(SHA1.HashData(raw));
                     if (!expectedCheckSum.Equals(checkSum))
@@ -121,6 +122,7 @@ namespace Org.Apache.Rocketmq
 
                     break;
                 }
+                case Proto.DigestType.Unspecified:
                 default:
                 {
                     Logger.Error(
@@ -131,18 +133,19 @@ namespace Org.Apache.Rocketmq
             }
 
             var bodyEncoding = systemProperties.BodyEncoding;
-            byte[] body = raw;
+            var body = raw;
             switch (bodyEncoding)
             {
-                case rmq.Encoding.Gzip:
+                case Proto.Encoding.Gzip:
                 {
                     body = Utilities.DecompressBytesGzip(message.Body.ToByteArray());
                     break;
                 }
-                case rmq.Encoding.Identity:
+                case Proto.Encoding.Identity:
                 {
                     break;
                 }
+                case Proto.Encoding.Unspecified:
                 default:
                 {
                     Logger.Error($"Unsupported message encoding algorithm," +
@@ -151,25 +154,22 @@ namespace Org.Apache.Rocketmq
                 }
             }
 
-            string tag = systemProperties.HasTag ? systemProperties.Tag : null;
-            string messageGroup = systemProperties.HasMessageGroup ? systemProperties.MessageGroup : null;
+            var tag = systemProperties.HasTag ? systemProperties.Tag : null;
+            var messageGroup = systemProperties.HasMessageGroup ? systemProperties.MessageGroup : null;
             var deliveryTime = systemProperties.DeliveryTimestamp.ToDateTime();
-            List<string> keys = new List<string>();
-            foreach (var key in systemProperties.Keys)
-            {
-                keys.Add(key);
-            }
+            var keys = systemProperties.Keys.ToList();
 
             var bornHost = systemProperties.BornHost;
             var bornTime = systemProperties.BornTimestamp.ToDateTime();
             var deliveryAttempt = systemProperties.DeliveryAttempt;
             var queueOffset = systemProperties.QueueOffset;
-            Dictionary<string, string> properties = new Dictionary<string, string>();
+            var properties = new Dictionary<string, string>();
             foreach (var (key, value) in message.UserProperties)
             {
                 properties.Add(key, value);
             }
 
+
             var receiptHandle = systemProperties.ReceiptHandle;
             return new MessageView(messageId, topic, body, tag, messageGroup, deliveryTime, keys, properties, bornHost,
                 bornTime, deliveryAttempt, messageQueue, receiptHandle, queueOffset, corrupted);
diff --git a/csharp/rocketmq-client-csharp/MqEncoding.cs b/csharp/rocketmq-client-csharp/MqEncoding.cs
index 27dfb052..ce6f4779 100644
--- a/csharp/rocketmq-client-csharp/MqEncoding.cs
+++ b/csharp/rocketmq-client-csharp/MqEncoding.cs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
@@ -27,13 +27,13 @@ namespace Org.Apache.Rocketmq
 
     public static class EncodingHelper
     {
-        public static rmq.Encoding ToProtobuf(MqEncoding mqEncoding)
+        public static Proto.Encoding ToProtobuf(MqEncoding mqEncoding)
         {
             return mqEncoding switch
             {
-                MqEncoding.Gzip => rmq.Encoding.Gzip,
-                MqEncoding.Identity => rmq.Encoding.Identity,
-                _ => rmq.Encoding.Unspecified
+                MqEncoding.Gzip => Proto.Encoding.Gzip,
+                MqEncoding.Identity => Proto.Encoding.Identity,
+                _ => Proto.Encoding.Unspecified
             };
         }
     }
diff --git a/csharp/rocketmq-client-csharp/Permission.cs b/csharp/rocketmq-client-csharp/Permission.cs
index eedd6247..20d2828d 100644
--- a/csharp/rocketmq-client-csharp/Permission.cs
+++ b/csharp/rocketmq-client-csharp/Permission.cs
@@ -46,8 +46,24 @@ namespace Org.Apache.Rocketmq
                     throw new InternalErrorException("Permission is not specified");
             }
         }
-        
-        public static bool IsWritable(Permission permission) {
+
+        public static Proto.Permission ToProtobuf(Permission permission)
+        {
+            switch (permission)
+            {
+                case Permission.Read:
+                    return Proto.Permission.Read;
+                case Permission.Write:
+                    return Proto.Permission.Write;
+                case Permission.ReadWrite:
+                    return Proto.Permission.ReadWrite;
+                default:
+                    throw new InternalErrorException("Permission is not specified");
+            }
+        }
+
+        public static bool IsWritable(Permission permission)
+        {
             switch (permission)
             {
                 case Permission.Write:
@@ -74,6 +90,4 @@ namespace Org.Apache.Rocketmq
             }
         }
     }
-    
-
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index fc41926b..62387a98 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -25,7 +25,7 @@ using NLog;
 
 namespace Org.Apache.Rocketmq
 {
-    public class Producer : Client, IProducer
+    public class Producer : Client
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
         private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
@@ -42,7 +42,7 @@ namespace Org.Apache.Rocketmq
         }
 
         private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
-            base(clientConfig, topics)
+            base(clientConfig, topics.Keys)
         {
             var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
             _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
@@ -54,7 +54,7 @@ namespace Org.Apache.Rocketmq
         {
             foreach (var topic in topics)
             {
-                Topics[topic] = false;
+                Topics.Add(topic);
             }
         }
 
@@ -94,7 +94,7 @@ namespace Org.Apache.Rocketmq
             return publishingLoadBalancer;
         }
 
-        protected override void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData)
+        protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
         {
             var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
             _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer);
diff --git a/csharp/rocketmq-client-csharp/Publishing.cs b/csharp/rocketmq-client-csharp/Publishing.cs
index ffedd177..055e36ce 100644
--- a/csharp/rocketmq-client-csharp/Publishing.cs
+++ b/csharp/rocketmq-client-csharp/Publishing.cs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 using System.Collections.Generic;
 
 namespace Org.Apache.Rocketmq
@@ -23,7 +23,7 @@ namespace Org.Apache.Rocketmq
     // Settings for publishing
     public class Publishing
     {
-        public List<rmq::Resource> Topics { get; set; }
+        public List<Proto::Resource> Topics { get; set; }
         public int CompressBodyThreshold { get; set; }
 
         public int MaxBodySize { get; set; }
diff --git a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
index c33bc7dc..2133af5a 100644
--- a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
@@ -18,23 +18,24 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
-using rmq = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
     public class PublishingLoadBalancer
     {
         private readonly List<MessageQueue> _messageQueues;
+
+        // TODO
         private int _roundRobinIndex;
 
         public PublishingLoadBalancer(TopicRouteData route)
         {
             _messageQueues = new List<MessageQueue>();
-            foreach (var messageQueue in route.MessageQueues.Where(messageQueue =>
+            foreach (var mq in route.MessageQueues.Where(messageQueue =>
                          PermissionHelper.IsWritable(messageQueue.Permission) &&
                          Utilities.MasterBrokerId == messageQueue.Broker.Id))
             {
-                _messageQueues.Add(messageQueue);
+                _messageQueues.Add(mq);
             }
 
             var random = new Random();
@@ -70,21 +71,23 @@ namespace Org.Apache.Rocketmq
                 }
             }
 
-            if (candidates.Count != 0) return candidates;
+            if (candidates.Count != 0)
+            {
+                return candidates;
+            }
+
+            foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count))
+                         .Select(positiveMod => _messageQueues[positiveMod]))
             {
-                foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count))
-                             .Select(positiveMod => _messageQueues[positiveMod]))
+                if (!candidateBrokerNames.Contains(mq.Broker.Name))
                 {
-                    if (!candidateBrokerNames.Contains(mq.Broker.Name))
-                    {
-                        candidateBrokerNames.Add(mq.Broker.Name);
-                        candidates.Add(mq);
-                    }
+                    candidateBrokerNames.Add(mq.Broker.Name);
+                    candidates.Add(mq);
+                }
 
-                    if (candidates.Count >= count)
-                    {
-                        break;
-                    }
+                if (candidates.Count >= count)
+                {
+                    break;
                 }
             }
 
diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs
index 023b0be3..b9f8f454 100644
--- a/csharp/rocketmq-client-csharp/PublishingSettings.cs
+++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs
@@ -17,7 +17,7 @@
 
 using System;
 using System.Collections.Concurrent;
-using System.Collections.Generic;
+using System.Linq;
 using Google.Protobuf.WellKnownTypes;
 using Proto = Apache.Rocketmq.V2;
 
@@ -28,8 +28,8 @@ namespace Org.Apache.Rocketmq
         private volatile int _maxBodySizeBytes = 4 * 1024 * 1024;
         private volatile bool _validateMessageType = true;
 
-        public PublishingSettings(string clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy,
-            TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, accessPoint,
+        public PublishingSettings(string clientId, Endpoints endpoints, ExponentialBackoffRetryPolicy retryPolicy,
+            TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, endpoints,
             retryPolicy, requestTimeout)
         {
             Topics = topics;
@@ -54,14 +54,7 @@ namespace Org.Apache.Rocketmq
 
         public override Proto.Settings ToProtobuf()
         {
-            List<Proto.Resource> topics = new List<Proto.Resource>();
-            foreach (var topic in Topics)
-            {
-                topics.Add(new Proto.Resource
-                {
-                    Name = topic.Key
-                });
-            }
+            var topics = Topics.Select(topic => new Proto.Resource { Name = topic.Key }).ToList();
 
             var publishing = new Proto.Publishing();
             publishing.Topics.Add(topics);
@@ -69,7 +62,7 @@ namespace Org.Apache.Rocketmq
             return new Proto.Settings
             {
                 Publishing = publishing,
-                AccessPoint = AccessPoint.ToProtobuf(),
+                AccessPoint = Endpoints.ToProtobuf(),
                 ClientType = ClientTypeHelper.ToProtobuf(ClientType),
                 RequestTimeout = Duration.FromTimeSpan(RequestTimeout),
                 BackoffPolicy = RetryPolicy.ToProtobuf(),
diff --git a/csharp/rocketmq-client-csharp/IProducer.cs b/csharp/rocketmq-client-csharp/ReceiveMessageResult.cs
similarity index 71%
rename from csharp/rocketmq-client-csharp/IProducer.cs
rename to csharp/rocketmq-client-csharp/ReceiveMessageResult.cs
index 420af202..1898838b 100644
--- a/csharp/rocketmq-client-csharp/IProducer.cs
+++ b/csharp/rocketmq-client-csharp/ReceiveMessageResult.cs
@@ -15,16 +15,20 @@
  * limitations under the License.
  */
 
-using System.Threading.Tasks;
+using System.Collections.Generic;
 
 namespace Org.Apache.Rocketmq
 {
-    public interface IProducer
+    public class ReceiveMessageResult
     {
-        Task Start();
+        public ReceiveMessageResult(Endpoints endpoints, List<MessageView> messages)
+        {
+            Endpoints = endpoints;
+            Messages = messages;
+        }
 
-        Task Shutdown();
+        public Endpoints Endpoints { get; }
 
-        Task<SendReceipt> Send(Message message);
+        public List<MessageView> Messages { get; }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Resource.cs b/csharp/rocketmq-client-csharp/Resource.cs
index a1825f15..76c0ba9e 100644
--- a/csharp/rocketmq-client-csharp/Resource.cs
+++ b/csharp/rocketmq-client-csharp/Resource.cs
@@ -11,6 +11,12 @@ namespace Org.Apache.Rocketmq
             Name = resource.Name;
         }
 
+        public Resource(string name)
+        {
+            Namespace = "";
+            Name = name;
+        }
+
         public string Namespace { get; }
         public string Name { get; }
 
@@ -22,7 +28,7 @@ namespace Org.Apache.Rocketmq
                 Name = Name
             };
         }
-        
+
         public override string ToString()
         {
             return String.IsNullOrEmpty(Namespace) ? Name : $"{Namespace}.{Name}";
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs
index de28c184..bf45410c 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -21,7 +21,7 @@ using System.Net.Http;
 using System.Net.Security;
 using System.Threading;
 using System.Threading.Tasks;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 using Grpc.Core;
 using Grpc.Core.Interceptors;
 using Grpc.Net.Client;
@@ -32,7 +32,7 @@ namespace Org.Apache.Rocketmq
     public class RpcClient : IRpcClient
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
-        private readonly rmq::MessagingService.MessagingServiceClient _stub;
+        private readonly Proto::MessagingService.MessagingServiceClient _stub;
         private readonly GrpcChannel _channel;
         private readonly string _target;
 
@@ -44,7 +44,7 @@ namespace Org.Apache.Rocketmq
                 HttpHandler = CreateHttpHandler()
             });
             var invoker = _channel.Intercept(new ClientLoggerInterceptor());
-            _stub = new rmq::MessagingService.MessagingServiceClient(invoker);
+            _stub = new Proto::MessagingService.MessagingServiceClient(invoker);
         }
 
         public async Task Shutdown()
@@ -74,14 +74,14 @@ namespace Org.Apache.Rocketmq
             return handler;
         }
 
-        public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
+        public AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> Telemetry(Metadata metadata)
         {
             var deadline = DateTime.UtcNow.Add(TimeSpan.FromDays(3650));
             var callOptions = new CallOptions(metadata, deadline);
             return _stub.Telemetry(callOptions);
         }
 
-        public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request,
+        public async Task<Proto::QueryRouteResponse> QueryRoute(Metadata metadata, Proto::QueryRouteRequest request,
             TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
@@ -92,7 +92,7 @@ namespace Org.Apache.Rocketmq
         }
 
 
-        public async Task<rmq::HeartbeatResponse> Heartbeat(Metadata metadata, rmq::HeartbeatRequest request,
+        public async Task<Proto::HeartbeatResponse> Heartbeat(Metadata metadata, Proto::HeartbeatRequest request,
             TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
@@ -102,7 +102,7 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<rmq::SendMessageResponse> SendMessage(Metadata metadata, rmq::SendMessageRequest request,
+        public async Task<Proto::SendMessageResponse> SendMessage(Metadata metadata, Proto::SendMessageRequest request,
             TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
@@ -112,8 +112,8 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Metadata metadata,
-            rmq::QueryAssignmentRequest request,
+        public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Metadata metadata,
+            Proto::QueryAssignmentRequest request,
             TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
@@ -123,14 +123,14 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata,
-            rmq::ReceiveMessageRequest request, TimeSpan timeout)
+        public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata,
+            Proto::ReceiveMessageRequest request, TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
             var callOptions = new CallOptions(metadata, deadline);
             var call = _stub.ReceiveMessage(request, callOptions);
             Logger.Debug($"ReceiveMessageRequest has been written to {_target}");
-            var result = new List<rmq::ReceiveMessageResponse>();
+            var result = new List<Proto::ReceiveMessageResponse>();
             var stream = call.ResponseStream;
             while (await stream.MoveNext())
             {
@@ -143,7 +143,7 @@ namespace Org.Apache.Rocketmq
             return result;
         }
 
-        public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request,
+        public async Task<Proto::AckMessageResponse> AckMessage(Metadata metadata, Proto::AckMessageRequest request,
             TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
@@ -153,8 +153,8 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata,
-            rmq::ChangeInvisibleDurationRequest request,
+        public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata,
+            Proto::ChangeInvisibleDurationRequest request,
             TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
@@ -164,9 +164,9 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<rmq::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(
+        public async Task<Proto::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(
             Metadata metadata,
-            rmq::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
+            Proto::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
             var callOptions = new CallOptions(metadata, deadline);
@@ -175,8 +175,8 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<rmq::EndTransactionResponse> EndTransaction(Metadata metadata,
-            rmq::EndTransactionRequest request,
+        public async Task<Proto::EndTransactionResponse> EndTransaction(Metadata metadata,
+            Proto::EndTransactionRequest request,
             TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
@@ -186,8 +186,8 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<rmq::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
-            rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+        public async Task<Proto::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
+            Proto::NotifyClientTerminationRequest request, TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
             var callOptions = new CallOptions(metadata, deadline);
diff --git a/csharp/rocketmq-client-csharp/Settings.cs b/csharp/rocketmq-client-csharp/Settings.cs
index 7716fc2d..491aa564 100644
--- a/csharp/rocketmq-client-csharp/Settings.cs
+++ b/csharp/rocketmq-client-csharp/Settings.cs
@@ -24,25 +24,25 @@ namespace Org.Apache.Rocketmq
     {
         protected readonly string ClientId;
         protected readonly ClientType ClientType;
-        protected readonly Endpoints AccessPoint;
+        protected readonly Endpoints Endpoints;
         protected volatile IRetryPolicy RetryPolicy;
         protected readonly TimeSpan RequestTimeout;
 
-        public Settings(string clientId, ClientType clientType, Endpoints accessPoint, IRetryPolicy retryPolicy,
+        public Settings(string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
             TimeSpan requestTimeout)
         {
             ClientId = clientId;
             ClientType = clientType;
-            AccessPoint = accessPoint;
+            Endpoints = endpoints;
             RetryPolicy = retryPolicy;
             RequestTimeout = requestTimeout;
         }
 
-        public Settings(string clientId, ClientType clientType, Endpoints accessPoint, TimeSpan requestTimeout)
+        public Settings(string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
         {
             ClientId = clientId;
             ClientType = clientType;
-            AccessPoint = accessPoint;
+            Endpoints = endpoints;
             RetryPolicy = null;
             RequestTimeout = requestTimeout;
         }
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
new file mode 100644
index 00000000..1a0f0ec2
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -0,0 +1,192 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+using NLog;
+using Org.Apache.Rocketmq.Error;
+
+namespace Org.Apache.Rocketmq
+{
+    public class SimpleConsumer : Consumer
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+        private readonly ConcurrentDictionary<string /* topic */, SubscriptionLoadBalancer> _subscriptionRouteDataCache;
+        private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions;
+        private readonly TimeSpan _awaitDuration;
+        private readonly SimpleSubscriptionSettings _simpleSubscriptionSettings;
+        private int _topicRoundRobinIndex;
+
+        public SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration,
+            Dictionary<string, FilterExpression> subscriptionExpressions) : this(clientConfig, consumerGroup,
+            awaitDuration, new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions))
+        {
+        }
+
+        private SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration,
+            ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(clientConfig, consumerGroup,
+            subscriptionExpressions.Keys)
+        {
+            _awaitDuration = awaitDuration;
+            _subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>();
+            _subscriptionExpressions = subscriptionExpressions;
+            _simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, clientConfig.Endpoints,
+                ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions);
+            _topicRoundRobinIndex = 0;
+        }
+
+        public async Task Subscribe(string topic, FilterExpression filterExpression)
+        {
+            // TODO: check running status.
+            await GetSubscriptionLoadBalancer(topic);
+            _subscriptionExpressions.TryAdd(topic, filterExpression);
+        }
+
+        public void Unsubscribe(string topic)
+        {
+            _subscriptionExpressions.TryRemove(topic, out _);
+        }
+
+        public override async Task Start()
+        {
+            Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}");
+            await base.Start();
+            Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+        }
+
+        public override async Task Shutdown()
+        {
+            Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}");
+            await base.Shutdown();
+            Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+        }
+
+        protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
+        {
+            return new Proto::HeartbeatRequest
+            {
+                ClientType = Proto.ClientType.SimpleConsumer
+            };
+        }
+
+        protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
+        {
+            var subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData);
+            _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
+        }
+
+        public override Proto.Settings GetSettings()
+        {
+            return _simpleSubscriptionSettings.ToProtobuf();
+        }
+
+        public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
+        {
+            _simpleSubscriptionSettings.Sync(settings);
+        }
+
+
+        private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic)
+        {
+            if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer))
+            {
+                return subscriptionLoadBalancer;
+            }
+
+            var topicRouteData = await FetchTopicRoute(topic);
+            subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData);
+            _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
+
+            return subscriptionLoadBalancer;
+        }
+
+
+        public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration)
+        {
+            if (maxMessageNum <= 0)
+            {
+                throw new InternalErrorException("maxMessageNum must be greater than 0");
+            }
+
+            var copy = new ConcurrentDictionary<string, FilterExpression>(_subscriptionExpressions);
+            var topics = new List<string>(copy.Keys);
+            if (topics.Count <= 0)
+            {
+                throw new ArgumentException("There is no topic to receive message");
+            }
+
+            var index = Utilities.GetPositiveMod(Interlocked.Increment(ref _topicRoundRobinIndex), topics.Count);
+            var topic = topics[index];
+            var filterExpression = _subscriptionExpressions[topic];
+            var subscriptionLoadBalancer = await GetSubscriptionLoadBalancer(topic);
+
+            var mq = subscriptionLoadBalancer.TakeMessageQueue();
+            var request = WrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration);
+            var receiveMessageResult = await ReceiveMessage(request, mq, _awaitDuration);
+            return receiveMessageResult.Messages;
+        }
+
+        public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
+        {
+            var request = WrapChangeInvisibleDuration(messageView, invisibleDuration);
+            var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
+                request, ClientConfig.RequestTimeout);
+            StatusChecker.Check(response.Status, request);
+        }
+
+
+        public async Task Ack(MessageView messageView)
+        {
+            var request = WrapAckMessageRequest(messageView);
+            var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+                ClientConfig.RequestTimeout);
+            StatusChecker.Check(response.Status, request);
+        }
+
+        private Proto.AckMessageRequest WrapAckMessageRequest(MessageView messageView)
+        {
+            var topicResource = new Proto.Resource
+            {
+                Name = messageView.Topic
+            };
+            var entry = new Proto.AckMessageEntry
+            {
+                MessageId = messageView.MessageId,
+                ReceiptHandle = messageView.ReceiptHandle,
+            };
+            return new Proto.AckMessageRequest
+            {
+                Group = GetProtobufGroup(),
+                Topic = topicResource,
+                Entries = { entry }
+            };
+        }
+
+        private Proto.ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView,
+            TimeSpan invisibleDuration)
+        {
+            var topicResource = new Proto.Resource
+            {
+                Name = messageView.Topic
+            };
+            return new Proto.ChangeInvisibleDurationRequest
+            {
+                Topic = topicResource,
+                Group = GetProtobufGroup(),
+                ReceiptHandle = messageView.ReceiptHandle,
+                InvisibleDuration = Duration.FromTimeSpan(invisibleDuration),
+                MessageId = messageView.MessageId
+            };
+        }
+
+        private Proto.Resource GetProtobufGroup()
+        {
+            return new Proto.Resource()
+            {
+                Name = ConsumerGroup
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
new file mode 100644
index 00000000..a6a409d7
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Google.Protobuf.WellKnownTypes;
+using NLog;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public class SimpleSubscriptionSettings : Settings
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+        private readonly Resource _group;
+        private readonly TimeSpan _longPollingTimeout;
+        private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions;
+
+        public SimpleSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup,
+            TimeSpan requestTimeout, TimeSpan longPollingTimeout,
+            ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(
+            clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
+        {
+            _group = new Resource(consumerGroup);
+            _longPollingTimeout = longPollingTimeout;
+            _subscriptionExpressions = subscriptionExpressions;
+        }
+
+        public override void Sync(Proto::Settings settings)
+        {
+            // TODO
+        }
+
+        public override Proto.Settings ToProtobuf()
+        {
+            var subscriptionEntries = new List<Proto.SubscriptionEntry>();
+            foreach (var (key, value) in _subscriptionExpressions)
+            {
+                var topic = new Proto.Resource()
+                {
+                    Name = key,
+                };
+                var subscriptionEntry = new Proto.SubscriptionEntry();
+                var filterExpression = new Proto.FilterExpression();
+                switch (value.Type)
+                {
+                    case ExpressionType.Tag:
+                        filterExpression.Type = Proto.FilterType.Tag;
+                        break;
+                    case ExpressionType.Sql92:
+                        filterExpression.Type = Proto.FilterType.Sql;
+                        break;
+                    default:
+                        Logger.Warn($"[Bug] Unrecognized filter type={value.Type} for simple consumer");
+                        break;
+                }
+
+                filterExpression.Expression = value.Expression;
+                subscriptionEntry.Topic = topic;
+                subscriptionEntries.Add(subscriptionEntry);
+            }
+
+            var subscription = new Proto.Subscription
+            {
+                Group = _group.ToProtobuf(),
+                Subscriptions = { subscriptionEntries },
+                LongPollingTimeout = Duration.FromTimeSpan(_longPollingTimeout)
+            };
+            return new Proto.Settings
+            {
+                AccessPoint = Endpoints.ToProtobuf(),
+                ClientType = ClientTypeHelper.ToProtobuf(ClientType),
+                RequestTimeout = Duration.FromTimeSpan(RequestTimeout),
+                Subscription = subscription,
+                UserAgent = UserAgent.Instance.ToProtobuf()
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/StatusChecker.cs b/csharp/rocketmq-client-csharp/StatusChecker.cs
index 641fd097..2802fd78 100644
--- a/csharp/rocketmq-client-csharp/StatusChecker.cs
+++ b/csharp/rocketmq-client-csharp/StatusChecker.cs
@@ -31,6 +31,7 @@ namespace Org.Apache.Rocketmq
             var statusCode = status.Code;
 
             var statusMessage = status.Message;
+            // TODO
             switch (statusCode)
             {
                 case Proto.Code.Ok:
diff --git a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
index b77da833..a8b1963e 100644
--- a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
@@ -15,37 +15,35 @@
  * limitations under the License.
  */
 
+using System;
 using System.Collections.Generic;
-using System.Threading;
-using rmq = Apache.Rocketmq.V2;
+using System.Linq;
 
 namespace Org.Apache.Rocketmq
 {
     internal sealed class SubscriptionLoadBalancer
     {
-        public List<rmq.Assignment> Assignments { get; private set; }
-        private uint index = 0;
+        private readonly List<MessageQueue> _messageQueues;
+        private int _roundRobinIndex;
 
-        public SubscriptionLoadBalancer(List<rmq.Assignment> assignments)
+        public SubscriptionLoadBalancer(TopicRouteData topicRouteData)
         {
-            Assignments = assignments;
-        }
+            _messageQueues = new List<MessageQueue>();
+            foreach (var mq in topicRouteData.MessageQueues.Where(mq => PermissionHelper.IsReadable(mq.Permission))
+                         .Where(mq => Utilities.MasterBrokerId == mq.Broker.Id))
+            {
+                _messageQueues.Add(mq);
+            }
 
-        private SubscriptionLoadBalancer(uint oldIndex, List<rmq.Assignment> assignments)
-        {
-            index = oldIndex;
-            Assignments = assignments;
-        }
-
-        public SubscriptionLoadBalancer Update(List<rmq.Assignment> newAssignments)
-        {
-            return new SubscriptionLoadBalancer(index, newAssignments);
+            var random = new Random();
+            _roundRobinIndex = random.Next(0, _messageQueues.Count);
         }
 
-        public rmq.MessageQueue TakeMessageQueue()
+        public MessageQueue TakeMessageQueue()
         {
-            var i = Interlocked.Increment(ref index);
-            return Assignments[(int)(i % Assignments.Count)].MessageQueue;
+            var next = ++_roundRobinIndex;
+            var index = Utilities.GetPositiveMod(next, _messageQueues.Count);
+            return _messageQueues[index];
         }
     }
-}
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs
index 2be2b9a5..885db5f6 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteData.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs
@@ -18,19 +18,15 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
     public class TopicRouteData : IEquatable<TopicRouteData>
     {
-        public TopicRouteData(List<rmq::MessageQueue> messageQueues)
+        public TopicRouteData(IEnumerable<Proto.MessageQueue> messageQueues)
         {
-            var messageQueuesList = new List<MessageQueue>();
-            foreach (var mq in messageQueues)
-            {
-                messageQueuesList.Add(new MessageQueue(mq));
-            }
+            var messageQueuesList = messageQueues.Select(mq => new MessageQueue(mq)).ToList();
 
             MessageQueues = messageQueuesList;
         }
diff --git a/csharp/rocketmq-client-csharp/Utilities.cs b/csharp/rocketmq-client-csharp/Utilities.cs
index 592f364e..d032ae1e 100644
--- a/csharp/rocketmq-client-csharp/Utilities.cs
+++ b/csharp/rocketmq-client-csharp/Utilities.cs
@@ -22,7 +22,6 @@ using System;
 using System.IO;
 using System.IO.Compression;
 using System.Threading;
-using rmq = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq
 {
@@ -81,10 +80,10 @@ namespace Org.Apache.Rocketmq
 
         public static string ByteArrayToHexString(byte[] bytes)
         {
-            StringBuilder result = new StringBuilder(bytes.Length * 2);
+            var result = new StringBuilder(bytes.Length * 2);
             const string hexAlphabet = "0123456789ABCDEF";
 
-            foreach (byte b in bytes)
+            foreach (var b in bytes)
             {
                 result.Append(hexAlphabet[(int)(b >> 4)]);
                 result.Append(hexAlphabet[(int)(b & 0xF)]);
diff --git a/csharp/tests/UnitTest1.cs b/csharp/tests/UnitTest1.cs
index af13d3dc..2f6b468a 100644
--- a/csharp/tests/UnitTest1.cs
+++ b/csharp/tests/UnitTest1.cs
@@ -14,11 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.Rocketmq;
-using Grpc.Net.Client;
-using rmq = Apache.Rocketmq.V2;
 
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Proto = Apache.Rocketmq.V2;
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
@@ -31,33 +29,32 @@ namespace tests
         [TestMethod]
         public void TestMethod1()
         {
-            rmq::Permission perm = rmq::Permission.None;
+            Proto::Permission perm = Proto::Permission.None;
             switch (perm)
             {
-                case rmq::Permission.None:
-                    {
-                        Console.WriteLine("None");
-                        break;
-                    }
-
-                case rmq::Permission.Read:
-                    {
-                        Console.WriteLine("Read");
-                        break;
-                    }
+                case Proto::Permission.None:
+                {
+                    Console.WriteLine("None");
+                    break;
+                }
 
-                case rmq::Permission.Write:
-                    {
-                        Console.WriteLine("Write");
-                        break;
-                    }
+                case Proto::Permission.Read:
+                {
+                    Console.WriteLine("Read");
+                    break;
+                }
 
-                case rmq::Permission.ReadWrite:
-                    {
-                        Console.WriteLine("ReadWrite");
-                        break;
-                    }
+                case Proto::Permission.Write:
+                {
+                    Console.WriteLine("Write");
+                    break;
+                }
 
+                case Proto::Permission.ReadWrite:
+                {
+                    Console.WriteLine("ReadWrite");
+                    break;
+                }
             }
         }
 
@@ -82,4 +79,4 @@ namespace tests
             Assert.AreEqual(1, list.Count);
         }
     }
-}
+}
\ No newline at end of file