You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/20 02:20:05 UTC

[rocketmq-clients] 02/04: Align dotnet driver with other drivers regarding FIFO message handling

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit e1bf271fcc914706bb7f400674a407da9e1d0672
Author: colprog <co...@gmail.com>
AuthorDate: Sat Dec 17 03:19:26 2022 +0800

    Align dotnet driver with other drivers regarding FIFO message handling
---
 csharp/rocketmq-client-csharp/Producer.cs            | 14 ++++++++------
 csharp/rocketmq-client-csharp/PublishLoadBalancer.cs |  9 ++++++++-
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index c2c5e5db..77016571 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -107,7 +107,7 @@ namespace Org.Apache.Rocketmq
         {
             _topicsOfInterest.Add(message.Topic);
 
-            if (!_loadBalancer.ContainsKey(message.Topic))
+            if (!_loadBalancer.TryGetValue(message.Topic, out var publishLb))
             {
                 var topicRouteData = await GetRouteFor(message.Topic, false);
                 if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
@@ -116,12 +116,10 @@ namespace Org.Apache.Rocketmq
                     throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
                 }
 
-                var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
-                _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+                publishLb = new PublishLoadBalancer(topicRouteData);
+                _loadBalancer.TryAdd(message.Topic, publishLb);
             }
 
-            var publishLb = _loadBalancer[message.Topic];
-
             var request = new rmq::SendMessageRequest();
             var entry = new rmq::Message
             {
@@ -176,7 +174,7 @@ namespace Org.Apache.Rocketmq
             }
 
             List<string> targets = new List<string>();
-            List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
+            List<rmq::MessageQueue> candidates = publishLb.Select(message.MessageGroup, message.MaxAttemptTimes);
             foreach (var messageQueue in candidates)
             {
                 targets.Add(Utilities.TargetUrl(messageQueue));
@@ -205,6 +203,10 @@ namespace Org.Apache.Rocketmq
                         
                         return new SendReceipt(messageId);
                     }
+                    else if (response?.Status is not null)
+                    {
+                        Logger.Warn($"Send failed with code: {response.Status.Code}, error: {response.Status.Message}");
+                    }
                 }
                 catch (Exception e)
                 {
diff --git a/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
index 7d258b4f..6670beb6 100644
--- a/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -85,7 +85,7 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
-        public List<rmq::MessageQueue> Select(int maxAttemptTimes)
+        public List<rmq::MessageQueue> Select(string messageGroup, int maxAttemptTimes)
         {
             List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
 
@@ -94,6 +94,13 @@ namespace Org.Apache.Rocketmq
             {
                 return result;
             }
+
+            if (!string.IsNullOrEmpty(messageGroup))
+            {
+                result.Add(all[messageGroup.GetHashCode() % all.Count]);
+                return result;
+            }
+
             int start = ++_roundRobinIndex;
             int found = 0;