You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/14 08:57:49 UTC

[rocketmq-clients] branch master updated (7849cbce -> 6766a8ff)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


    from 7849cbce Increase metric exporter RPC timeout to 5 seconds
     new a5aa817c Bugfix: ConcurrentDictionary#TryAdd will not replace the existed element
     new 6766a8ff Fix code style

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 csharp/rocketmq-client-csharp/Producer.cs          | 23 ++++++++++++----
 .../PublishingLoadBalancer.cs                      | 22 ++++++++++-----
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    | 32 ++++++++++++++--------
 .../SubscriptionLoadBalancer.cs                    | 20 ++++++++++----
 4 files changed, 68 insertions(+), 29 deletions(-)


[rocketmq-clients] 01/02: Bugfix: ConcurrentDictionary#TryAdd will not replace the existed element

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit a5aa817c2d9e751e3efe6c4a2aa0e952235937cc
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Mar 14 16:49:00 2023 +0800

    Bugfix: ConcurrentDictionary#TryAdd will not replace the existed element
---
 csharp/rocketmq-client-csharp/Producer.cs          | 23 +++++++++++----
 .../PublishingLoadBalancer.cs                      | 22 +++++++++-----
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    | 34 ++++++++++++++--------
 .../SubscriptionLoadBalancer.cs                    | 20 +++++++++----
 4 files changed, 69 insertions(+), 30 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index de1c4821..da7e8d03 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -115,6 +115,21 @@ namespace Org.Apache.Rocketmq
             return new Proto::NotifyClientTerminationRequest();
         }
 
+        private PublishingLoadBalancer UpdatePublishingLoadBalancer(string topic, TopicRouteData topicRouteData)
+        {
+            if (_publishingRouteDataCache.TryGetValue(topic, out var publishingLoadBalancer))
+            {
+                publishingLoadBalancer = publishingLoadBalancer.Update(topicRouteData);
+            }
+            else
+            {
+                publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
+            }
+
+            _publishingRouteDataCache[topic] = publishingLoadBalancer;
+            return publishingLoadBalancer;
+        }
+
         private async Task<PublishingLoadBalancer> GetPublishingLoadBalancer(string topic)
         {
             if (_publishingRouteDataCache.TryGetValue(topic, out var publishingLoadBalancer))
@@ -123,16 +138,12 @@ namespace Org.Apache.Rocketmq
             }
 
             var topicRouteData = await GetRouteData(topic);
-            publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
-            _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer);
-
-            return publishingLoadBalancer;
+            return UpdatePublishingLoadBalancer(topic, topicRouteData);
         }
 
         protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
         {
-            var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
-            _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer);
+            UpdatePublishingLoadBalancer(topic, topicRouteData);
         }
 
         private IRetryPolicy GetRetryPolicy()
diff --git a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
index 2133af5a..b0cc0d2b 100644
--- a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
@@ -18,28 +18,36 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 
 namespace Org.Apache.Rocketmq
 {
     public class PublishingLoadBalancer
     {
+        private static readonly Random Random = new();
+
         private readonly List<MessageQueue> _messageQueues;
+        private int _index;
 
-        // TODO
-        private int _roundRobinIndex;
+        public PublishingLoadBalancer(TopicRouteData topicRouteData) : this(Random.Next(), topicRouteData)
+        {
+        }
 
-        public PublishingLoadBalancer(TopicRouteData route)
+        private PublishingLoadBalancer(int index, TopicRouteData topicRouteData)
         {
+            _index = index;
             _messageQueues = new List<MessageQueue>();
-            foreach (var mq in route.MessageQueues.Where(messageQueue =>
+            foreach (var mq in topicRouteData.MessageQueues.Where(messageQueue =>
                          PermissionHelper.IsWritable(messageQueue.Permission) &&
                          Utilities.MasterBrokerId == messageQueue.Broker.Id))
             {
                 _messageQueues.Add(mq);
             }
+        }
 
-            var random = new Random();
-            _roundRobinIndex = random.Next(0, _messageQueues.Count);
+        internal PublishingLoadBalancer Update(TopicRouteData topicRouteData)
+        {
+            return new PublishingLoadBalancer(_index, topicRouteData);
         }
 
 
@@ -52,7 +60,7 @@ namespace Org.Apache.Rocketmq
 
         public List<MessageQueue> TakeMessageQueues(HashSet<Endpoints> excluded, int count)
         {
-            var next = ++_roundRobinIndex;
+            var next = Interlocked.Increment(ref _index);
             var candidates = new List<MessageQueue>();
             var candidateBrokerNames = new HashSet<string>();
 
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 017e7665..7e54087d 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -61,7 +61,7 @@ namespace Org.Apache.Rocketmq
             }
 
             await GetSubscriptionLoadBalancer(topic);
-            _subscriptionExpressions.TryAdd(topic, filterExpression);
+            _subscriptionExpressions[topic] = filterExpression;
         }
 
         public void Unsubscribe(string topic)
@@ -142,15 +142,19 @@ namespace Org.Apache.Rocketmq
             };
         }
 
-        protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
+        private SubscriptionLoadBalancer UpdateSubscriptionLoadBalancer(string topic, TopicRouteData topicRouteData)
         {
-            var subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData);
-            _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
-        }
+            if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer))
+            {
+                subscriptionLoadBalancer = subscriptionLoadBalancer.Update(topicRouteData);
+            }
+            else
+            {
+                subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData);
+            }
 
-        internal override Settings GetSettings()
-        {
-            return _simpleSubscriptionSettings;
+            _subscriptionRouteDataCache[topic] = subscriptionLoadBalancer;
+            return subscriptionLoadBalancer;
         }
 
         private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic)
@@ -161,13 +165,19 @@ namespace Org.Apache.Rocketmq
             }
 
             var topicRouteData = await GetRouteData(topic);
-            subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData);
-            _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
-
-            return subscriptionLoadBalancer;
+            return UpdateSubscriptionLoadBalancer(topic, topicRouteData);
         }
 
+        protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
+        {
+            UpdateSubscriptionLoadBalancer(topic, topicRouteData);
+        }
 
+        internal override Settings GetSettings()
+        {
+            return _simpleSubscriptionSettings;
+        }
+        
         public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration)
         {
             if (State.Running != State)
diff --git a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
index a8b1963e..e2575a4b 100644
--- a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
@@ -18,30 +18,40 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 
 namespace Org.Apache.Rocketmq
 {
     internal sealed class SubscriptionLoadBalancer
     {
+        private static readonly Random Random = new();
+
         private readonly List<MessageQueue> _messageQueues;
-        private int _roundRobinIndex;
+        private int _index;
+
+        public SubscriptionLoadBalancer(TopicRouteData topicRouteData) : this(Random.Next(), topicRouteData)
+        {
+        }
 
-        public SubscriptionLoadBalancer(TopicRouteData topicRouteData)
+        private SubscriptionLoadBalancer(int index, TopicRouteData topicRouteData)
         {
+            _index = index;
             _messageQueues = new List<MessageQueue>();
             foreach (var mq in topicRouteData.MessageQueues.Where(mq => PermissionHelper.IsReadable(mq.Permission))
                          .Where(mq => Utilities.MasterBrokerId == mq.Broker.Id))
             {
                 _messageQueues.Add(mq);
             }
+        }
 
-            var random = new Random();
-            _roundRobinIndex = random.Next(0, _messageQueues.Count);
+        internal SubscriptionLoadBalancer Update(TopicRouteData topicRouteData)
+        {
+            return new SubscriptionLoadBalancer(_index, topicRouteData);
         }
 
         public MessageQueue TakeMessageQueue()
         {
-            var next = ++_roundRobinIndex;
+            var next = Interlocked.Increment(ref _index);
             var index = Utilities.GetPositiveMod(next, _messageQueues.Count);
             return _messageQueues[index];
         }


[rocketmq-clients] 02/02: Fix code style

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 6766a8ff3a25425b99765186504df6c2dd649344
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Mar 14 16:51:41 2023 +0800

    Fix code style
---
 csharp/rocketmq-client-csharp/SimpleConsumer.cs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 7e54087d..2146e643 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -177,7 +177,7 @@ namespace Org.Apache.Rocketmq
         {
             return _simpleSubscriptionSettings;
         }
-        
+
         public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration)
         {
             if (State.Running != State)