You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/01 05:24:57 UTC

[rocketmq-clients] branch csharp_dev updated: Clean up code

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/csharp_dev by this push:
     new c3233f1  Clean up code
c3233f1 is described below

commit c3233f131d99d86d21d0e1b73b01c787a5179802
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Thu Sep 1 13:24:47 2022 +0800

    Clean up code
---
 csharp/examples/Program.cs                      |  44 ++++--
 csharp/rocketmq-client-csharp/Client.cs         | 183 ++++++++++++++----------
 csharp/rocketmq-client-csharp/IClient.cs        |   4 +-
 csharp/rocketmq-client-csharp/Session.cs        |  39 +++--
 csharp/rocketmq-client-csharp/SimpleConsumer.cs | 181 +++++++++++++----------
 5 files changed, 262 insertions(+), 189 deletions(-)

diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index e0d3851..c0dd734 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -23,11 +23,13 @@ namespace examples
 {
     class Program
     {
-        private const string accessUrl = "rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
-        private const string standardTopic = "sdk_standard";
-        private const string fifoTopic = "sdk_fifo";
-        private const string timedTopic = "sdk_timed";
-        private const string transactionalTopic = "sdk_transactional";
+        private const string ACCESS_URL = "rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
+        private const string STANDARD_TOPIC = "sdk_standard";
+        private const string FIFO_TOPIC = "sdk_fifo";
+        private const string TIMED_TOPIC = "sdk_timed";
+        private const string TRANSACTIONAL_TOPIC = "sdk_transactional";
+
+        private const string CONCURRENT_GROUP = "sdk_concurrency";
         
         private static async Task<SendReceipt> SendStandardMessage(Producer producer)
         {
@@ -40,7 +42,7 @@ namespace examples
                 "k2"
             };
             
-            var msg = new Message(standardTopic, body)
+            var msg = new Message(STANDARD_TOPIC, body)
             {
                 // Tag the massage. A message has at most one tag.
                 Tag = "Tag-0",
@@ -63,7 +65,7 @@ namespace examples
                 "k2"
             };
             
-            var msg = new Message(fifoTopic, body)
+            var msg = new Message(FIFO_TOPIC, body)
             {
                 // Tag the massage. A message has at most one tag.
                 Tag = "Tag-0",
@@ -89,7 +91,7 @@ namespace examples
                 "k2"
             };
             
-            var msg = new Message(timedTopic, body)
+            var msg = new Message(TIMED_TOPIC, body)
             {
                 // Tag the massage. A message has at most one tag.
                 Tag = "Tag-0",
@@ -100,16 +102,30 @@ namespace examples
             msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30);
             return await producer.Send(msg);
         }
+
+        private static async Task ConsumeMessage(SimpleConsumer simpleConsumer)
+        {
+            var messages = await simpleConsumer.Receive(32, TimeSpan.FromSeconds(60));
+            if (null != messages)
+            {
+                foreach (var message in messages)
+                {
+                    Console.WriteLine($"Receive a message, topic={message.Topic}, message-id={message.MessageId}");
+                }
+            }
+        }
         
         static async Task Main(string[] args)
         {
             var credentialsProvider = new ConfigFileCredentialsProvider();
-            var producer = new Producer(accessUrl);
-            producer.CredentialsProvider = credentialsProvider;
-            producer.AddTopicOfInterest(standardTopic);
-            producer.AddTopicOfInterest(fifoTopic);
-            producer.AddTopicOfInterest(timedTopic);
-            producer.AddTopicOfInterest(transactionalTopic);
+            var producer = new Producer(ACCESS_URL)
+            {
+                CredentialsProvider = credentialsProvider
+            };
+            producer.AddTopicOfInterest(STANDARD_TOPIC);
+            producer.AddTopicOfInterest(FIFO_TOPIC);
+            producer.AddTopicOfInterest(TIMED_TOPIC);
+            producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
 
             await producer.Start();
 
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 0fe4ec9..4b4884d 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -22,7 +22,7 @@ using System.Threading;
 using System.Diagnostics;
 using System;
 using rmq = Apache.Rocketmq.V2;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using NLog;
 using System.Diagnostics.Metrics;
 
@@ -37,34 +37,39 @@ namespace Org.Apache.Rocketmq
             AccessPoint = new AccessPoint(accessUrl);
 
             AccessPointScheme = AccessPoint.HostScheme();
-            var serviceEndpoint = new rmq::Address();
-            serviceEndpoint.Host = AccessPoint.Host;
-            serviceEndpoint.Port = AccessPoint.Port;
+            var serviceEndpoint = new rmq::Address
+            {
+                Host = AccessPoint.Host,
+                Port = AccessPoint.Port
+            };
             AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
 
             _resourceNamespace = "";
 
-            ClientSettings = new rmq::Settings();
+            ClientSettings = new rmq::Settings
+            {
+                AccessPoint = new rmq::Endpoints
+                {
+                    Scheme = rmq::AddressScheme.Ipv4
+                }
+            };
 
-            ClientSettings.AccessPoint = new rmq::Endpoints();
-            ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
             ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
 
             ClientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
 
-            ClientSettings.UserAgent = new rmq.UA();
-            ClientSettings.UserAgent.Language = rmq::Language.DotNet;
-            ClientSettings.UserAgent.Version = "5.0.0";
-            ClientSettings.UserAgent.Platform = Environment.OSVersion.ToString();
-            ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+            ClientSettings.UserAgent = new rmq.UA
+            {
+                Language = rmq::Language.DotNet,
+                Version = "5.0.0",
+                Platform = Environment.OSVersion.ToString(),
+                Hostname = System.Net.Dns.GetHostName()
+            };
 
             _manager = new ClientManager();
 
             _topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
             _updateTopicRouteCts = new CancellationTokenSource();
-
-            _healthCheckCts = new CancellationTokenSource();
-
             _telemetryCts = new CancellationTokenSource();
         }
 
@@ -95,7 +100,7 @@ namespace Org.Apache.Rocketmq
             await _manager.Shutdown();
         }
 
-        protected string FilterBroker(Func<string, bool> acceptor)
+        private string FilterBroker(Func<string, bool> acceptor)
         {
             foreach (var item in _topicRouteTable)
             {
@@ -188,7 +193,7 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public void Schedule(Action action, int seconds, CancellationToken token)
+        protected void Schedule(Action action, int seconds, CancellationToken token)
         {
             if (null == action)
             {
@@ -213,7 +218,7 @@ namespace Org.Apache.Rocketmq
          * direct
          *    Indicate if we should by-pass cache and fetch route entries from name server.
          */
-        public async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
+        protected async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
         {
             if (!direct && _topicRouteTable.ContainsKey(topic))
             {
@@ -221,12 +226,18 @@ namespace Org.Apache.Rocketmq
             }
 
             // We got one or more name servers available.
-            var request = new rmq::QueryRouteRequest();
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-            request.Endpoints = new rmq::Endpoints();
-            request.Endpoints.Scheme = AccessPointScheme;
+            var request = new rmq::QueryRouteRequest
+            {
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                },
+                Endpoints = new rmq::Endpoints
+                {
+                    Scheme = AccessPointScheme
+                }
+            };
             foreach (var address in AccessPointEndpoints)
             {
                 request.Endpoints.Addresses.Add(address);
@@ -238,21 +249,17 @@ namespace Org.Apache.Rocketmq
             var serviceEndpoint = AccessPointEndpoints[index];
             // AccessPointAddresses.Count
             string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
-            TopicRouteData topicRouteData;
             try
             {
                 Logger.Debug($"Resolving route for topic={topic}");
-                topicRouteData = await _manager.ResolveRoute(target, metadata, request, RequestTimeout);
+                var topicRouteData = await _manager.ResolveRoute(target, metadata, request, RequestTimeout);
                 if (null != topicRouteData)
                 {
                     Logger.Debug($"Got route entries for {topic} from name server");
                     _topicRouteTable.TryAdd(topic, topicRouteData);
                     return topicRouteData;
                 }
-                else
-                {
-                    Logger.Warn($"Failed to query route of {topic} from {target}");
-                }
+                Logger.Warn($"Failed to query route of {topic} from {target}");
             }
             catch (Exception e)
             {
@@ -273,7 +280,11 @@ namespace Org.Apache.Rocketmq
                 return;
             }
 
-            var request = new rmq::HeartbeatRequest();
+            var request = new rmq::HeartbeatRequest
+            {
+                Group = null,
+                ClientType = rmq.ClientType.Unspecified
+            };
             PrepareHeartbeatData(request);
 
             var metadata = new grpc::Metadata();
@@ -303,15 +314,24 @@ namespace Org.Apache.Rocketmq
         {
             // Pick a broker randomly
             string target = FilterBroker((s) => true);
-            var request = new rmq::QueryAssignmentRequest();
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-            request.Endpoints = new rmq::Endpoints();
-            request.Endpoints.Scheme = AccessPointScheme;
+            var request = new rmq::QueryAssignmentRequest
+            {
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                },
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                Endpoints = new rmq::Endpoints
+                {
+                    Scheme = AccessPointScheme
+                }
+            };
+            
             foreach (var endpoint in AccessPointEndpoints)
             {
                 request.Endpoints.Addresses.Add(endpoint);
@@ -344,12 +364,12 @@ namespace Org.Apache.Rocketmq
             settings.MergeFrom(ClientSettings);
         }
 
-        public void CreateSession(string url)
+        private void CreateSession(string url)
         {
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
             var stream = _manager.Telemetry(url, metadata);
-            var session = new Session(url, stream, this);
+            var session = new Session(stream, this);
             _sessions.TryAdd(url, session);
             Task.Run(async () =>
             {
@@ -358,34 +378,45 @@ namespace Org.Apache.Rocketmq
         }
 
 
-        public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
+        internal async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
         {
             var targetUrl = TargetUrl(assignment);
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            var request = new rmq::ReceiveMessageRequest();
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-            request.MessageQueue = assignment.MessageQueue;
+            var request = new rmq::ReceiveMessageRequest
+            {
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                MessageQueue = assignment.MessageQueue
+            };
             var messages = await _manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
             return messages;
         }
 
         public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
         {
-            var request = new rmq::AckMessageRequest();
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-
-            var entry = new rmq::AckMessageEntry();
-            entry.ReceiptHandle = receiptHandle;
-            entry.MessageId = messageId;
+            var request = new rmq::AckMessageRequest
+            {
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                }
+            };
+
+            var entry = new rmq::AckMessageEntry
+            {
+                ReceiptHandle = receiptHandle,
+                MessageId = messageId
+            };
             request.Entries.Add(entry);
 
             var metadata = new grpc::Metadata();
@@ -395,17 +426,21 @@ namespace Org.Apache.Rocketmq
 
         public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
         {
-            var request = new rmq::ChangeInvisibleDurationRequest();
-            request.ReceiptHandle = receiptHandle;
-            request.Group = new rmq::Resource();
-            request.Group.ResourceNamespace = _resourceNamespace;
-            request.Group.Name = group;
-
-            request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = _resourceNamespace;
-            request.Topic.Name = topic;
-
-            request.MessageId = messageId;
+            var request = new rmq::ChangeInvisibleDurationRequest
+            {
+                ReceiptHandle = receiptHandle,
+                Group = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = group
+                },
+                Topic = new rmq::Resource
+                {
+                    ResourceNamespace = _resourceNamespace,
+                    Name = topic
+                },
+                MessageId = messageId
+            };
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
@@ -439,7 +474,7 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
-        public virtual void OnSettingsReceived(rmq::Settings settings)
+        internal virtual void OnSettingsReceived(rmq::Settings settings)
         {
             if (null != settings.Metric)
             {
@@ -465,9 +500,7 @@ namespace Org.Apache.Rocketmq
 
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
         private readonly CancellationTokenSource _updateTopicRouteCts;
-
-        private readonly CancellationTokenSource _healthCheckCts;
-
+        
         private readonly CancellationTokenSource _telemetryCts;
 
         public CancellationTokenSource TelemetryCts()
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs
index 3352028..a96e940 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -30,9 +30,7 @@ namespace Org.Apache.Rocketmq
         Task<bool> NotifyClientTermination();
 
         void BuildClientSetting(rmq::Settings settings);
-
-
-        void OnSettingsReceived(rmq::Settings settings);
+        
 
         CancellationTokenSource TelemetryCts();
     }
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index a6be057..d2011bf 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -18,7 +18,7 @@
 using System.Threading;
 using System.Threading.Channels;
 using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
 using NLog;
 using rmq = Apache.Rocketmq.V2;
 
@@ -28,22 +28,22 @@ namespace Org.Apache.Rocketmq
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        public Session(string target,
-            grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
-            IClient client)
+        public Session(grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+            Client client)
         {
-            this._target = target;
-            this._stream = stream;
-            this._client = client;
-            this._channel = Channel.CreateUnbounded<bool>();
+            _stream = stream;
+            _client = client;
+            _channel = Channel.CreateUnbounded<bool>();
         }
 
         public async Task Loop()
         {
-            var reader = this._stream.ResponseStream;
-            var writer = this._stream.RequestStream;
-            var request = new rmq::TelemetryCommand();
-            request.Settings = new rmq::Settings();
+            var reader = _stream.ResponseStream;
+            var writer = _stream.RequestStream;
+            var request = new rmq::TelemetryCommand
+            {
+                Settings = new rmq::Settings()
+            };
             _client.BuildClientSetting(request.Settings);
             await writer.WriteAsync(request);
             Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
@@ -94,13 +94,6 @@ namespace Org.Apache.Rocketmq
             await writer.CompleteAsync();
         }
 
-        private string _target;
-
-        public string Target
-        {
-            get { return _target; }
-        }
-
         public async Task AwaitSettingNegotiationCompletion()
         {
             if (0 != Interlocked.Read(ref _established))
@@ -112,11 +105,11 @@ namespace Org.Apache.Rocketmq
             await _channel.Reader.ReadAsync();
         }
 
-        private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
-        private IClient _client;
+        private readonly grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
+        private readonly Client _client;
 
-        private long _established = 0;
+        private long _established;
 
-        private Channel<bool> _channel;
+        private readonly Channel<bool> _channel;
     };
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index d4694ac..c2f4d58 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -33,7 +33,6 @@ namespace Org.Apache.Rocketmq
         public SimpleConsumer(string accessUrl, string group)
         : base(accessUrl)
         {
-            _fifo = false;
             _subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
             _topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
             _group = group;
@@ -44,10 +43,14 @@ namespace Org.Apache.Rocketmq
             base.BuildClientSetting(settings);
 
             settings.ClientType = rmq::ClientType.SimpleConsumer;
-            settings.Subscription = new rmq::Subscription();
-            settings.Subscription.Group = new rmq::Resource();
-            settings.Subscription.Group.Name = _group;
-            settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+            settings.Subscription = new rmq::Subscription
+            {
+                Group = new rmq::Resource
+                {
+                    Name = _group,
+                    ResourceNamespace = ResourceNamespace
+                }
+            };
 
             foreach (var kv in _subscriptions)
             {
@@ -87,20 +90,30 @@ namespace Org.Apache.Rocketmq
             List<string> topics = new List<string>();
             foreach (var sub in _subscriptions)
             {
-                var request = new rmq::QueryAssignmentRequest();
-                request.Topic = new rmq::Resource();
-                request.Topic.ResourceNamespace = ResourceNamespace;
-                request.Topic.Name = sub.Key;
+                var request = new rmq::QueryAssignmentRequest
+                {
+                    Topic = new rmq::Resource
+                    {
+                        ResourceNamespace = ResourceNamespace,
+                        Name = sub.Key
+                    }
+                };
                 topics.Add(sub.Key);
-                request.Group = new rmq::Resource();
-                request.Group.Name = _group;
-                request.Group.ResourceNamespace = ResourceNamespace;
-
-                request.Endpoints = new rmq::Endpoints();
-                request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
-                var address = new rmq::Address();
-                address.Host = AccessPoint.Host;
-                address.Port = AccessPoint.Port;
+                request.Group = new rmq::Resource
+                {
+                    Name = _group,
+                    ResourceNamespace = ResourceNamespace
+                };
+
+                request.Endpoints = new rmq::Endpoints
+                {
+                    Scheme = rmq.AddressScheme.Ipv4
+                };
+                var address = new rmq::Address
+                {
+                    Host = AccessPoint.Host,
+                    Port = AccessPoint.Port
+                };
                 request.Endpoints.Addresses.Add(address);
 
                 var metadata = new Metadata();
@@ -129,31 +142,39 @@ namespace Org.Apache.Rocketmq
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
         {
             request.ClientType = rmq::ClientType.SimpleConsumer;
-            request.Group = new rmq::Resource();
-            request.Group.Name = _group;
-            request.Group.ResourceNamespace = ResourceNamespace;
+            request.Group = new rmq::Resource
+            {
+                Name = _group,
+                ResourceNamespace = ResourceNamespace
+            };
         }
 
         public void Subscribe(string topic, rmq::FilterType filterType, string expression)
         {
-            var entry = new rmq::SubscriptionEntry();
-            entry.Topic = new rmq::Resource();
-            entry.Topic.Name = topic;
-            entry.Topic.ResourceNamespace = ResourceNamespace;
-            entry.Expression = new rmq::FilterExpression();
-            entry.Expression.Type = filterType;
-            entry.Expression.Expression = expression;
+            var entry = new rmq::SubscriptionEntry
+            {
+                Topic = new rmq::Resource
+                {
+                    Name = topic,
+                    ResourceNamespace = ResourceNamespace
+                },
+                Expression = new rmq::FilterExpression
+                {
+                    Type = filterType,
+                    Expression = expression
+                }
+            };
+
             _subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
             AddTopicOfInterest(topic);
         }
 
-        public override void OnSettingsReceived(rmq.Settings settings)
+        internal override void OnSettingsReceived(rmq.Settings settings)
         {
             base.OnSettingsReceived(settings);
 
             if (settings.Subscription.Fifo)
             {
-                _fifo = true;
                 Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
             }
         }
@@ -167,12 +188,16 @@ namespace Org.Apache.Rocketmq
                 return new List<Message>();
             }
 
-            var request = new rmq.ReceiveMessageRequest();
-            request.Group = new rmq.Resource();
-            request.Group.ResourceNamespace = ResourceNamespace;
-            request.Group.Name = _group;
+            var request = new rmq.ReceiveMessageRequest
+            {
+                Group = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = _group
+                },
+                MessageQueue = new rmq.MessageQueue()
+            };
 
-            request.MessageQueue = new rmq.MessageQueue();
             request.MessageQueue.MergeFrom(messageQueue);
             request.BatchSize = batchSize;
             
@@ -189,15 +214,20 @@ namespace Org.Apache.Rocketmq
 
         public async Task Ack(Message message)
         {
-            var request = new rmq.AckMessageRequest();
-            request.Group = new rmq.Resource();
-            request.Group.ResourceNamespace = ResourceNamespace;
-            request.Group.Name = _group;
-
-            request.Topic = new rmq.Resource();
-            request.Topic.ResourceNamespace = ResourceNamespace;
-            request.Topic.Name = message.Topic;
-            
+            var request = new rmq.AckMessageRequest
+            {
+                Group = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = _group
+                },
+                Topic = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = message.Topic
+                }
+            };
+
             var entry = new rmq.AckMessageEntry();
             request.Entries.Add(entry);
             entry.MessageId = message.MessageId;
@@ -211,19 +241,22 @@ namespace Org.Apache.Rocketmq
 
         public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
         {
-            var request = new rmq.ChangeInvisibleDurationRequest();
-            request.Group = new rmq.Resource();
-            request.Group.ResourceNamespace = ResourceNamespace;
-            request.Group.Name = _group;
-
-            request.Topic = new rmq.Resource();
-            request.Topic.ResourceNamespace = ResourceNamespace;
-            request.Topic.Name = message.Topic;
-
-            request.ReceiptHandle = message._receiptHandle;
-            request.MessageId = message.MessageId;
-            
-            request.InvisibleDuration = Duration.FromTimeSpan(invisibleDuration);
+            var request = new rmq.ChangeInvisibleDurationRequest
+            {
+                Group = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = _group
+                },
+                Topic = new rmq.Resource
+                {
+                    ResourceNamespace = ResourceNamespace,
+                    Name = message.Topic
+                },
+                ReceiptHandle = message._receiptHandle,
+                MessageId = message.MessageId,
+                InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
+            };
 
             var targetUrl = message._sourceHost;
             var metadata = new Metadata();
@@ -238,35 +271,35 @@ namespace Org.Apache.Rocketmq
                 return null;
             }
             
-            UInt32 topicSeq = CurrentTopicSequence.Value;
-            CurrentTopicSequence.Value = topicSeq + 1;
+            var topicSeq = _currentTopicSequence.Value;
+            _currentTopicSequence.Value = topicSeq + 1;
 
             var total = _topicAssignments.Count;
             var topicIndex = topicSeq % total;
             var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
             
-            UInt32 queueSeq = CurrentQueueSequence.Value;
-            CurrentQueueSequence.Value = queueSeq + 1;
-            List<rmq.Assignment> assignments;
-            if (_topicAssignments.TryGetValue(topic, out assignments))
+            UInt32 queueSeq = _currentQueueSequence.Value;
+            _currentQueueSequence.Value = queueSeq + 1;
+            if (!_topicAssignments.TryGetValue(topic, out var assignments))
             {
-                if (null == assignments)
-                {
-                    return null;
-                }
-                var idx = queueSeq % assignments.Count;
-                return assignments[(int)idx].MessageQueue;
-
+                return null;
             }
 
-            return null;
+            var idx = queueSeq % assignments?.Count;
+            return assignments?[(int)idx].MessageQueue;
         }
 
-        private ThreadLocal<UInt32> CurrentTopicSequence = new ThreadLocal<UInt32>(true);
-        private ThreadLocal<UInt32> CurrentQueueSequence = new ThreadLocal<UInt32>(true);
+        private readonly ThreadLocal<UInt32> _currentTopicSequence = new ThreadLocal<UInt32>(true)
+        {
+            Value = 0
+        };
+        
+        private readonly ThreadLocal<UInt32> _currentQueueSequence = new ThreadLocal<UInt32>(true)
+        {
+            Value = 0
+        };
 
         private readonly string _group;
-        private bool _fifo;
         private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
         private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
         private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();