You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/20 02:20:05 UTC
[rocketmq-clients] 02/04: Align dotnet driver with other drivers regarding FIFO message handling
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit e1bf271fcc914706bb7f400674a407da9e1d0672
Author: colprog <co...@gmail.com>
AuthorDate: Sat Dec 17 03:19:26 2022 +0800
Align dotnet driver with other drivers regarding FIFO message handling
---
csharp/rocketmq-client-csharp/Producer.cs | 14 ++++++++------
csharp/rocketmq-client-csharp/PublishLoadBalancer.cs | 9 ++++++++-
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index c2c5e5db..77016571 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -107,7 +107,7 @@ namespace Org.Apache.Rocketmq
{
_topicsOfInterest.Add(message.Topic);
- if (!_loadBalancer.ContainsKey(message.Topic))
+ if (!_loadBalancer.TryGetValue(message.Topic, out var publishLb))
{
var topicRouteData = await GetRouteFor(message.Topic, false);
if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
@@ -116,12 +116,10 @@ namespace Org.Apache.Rocketmq
throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
}
- var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
- _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+ publishLb = new PublishLoadBalancer(topicRouteData);
+ _loadBalancer.TryAdd(message.Topic, publishLb);
}
- var publishLb = _loadBalancer[message.Topic];
-
var request = new rmq::SendMessageRequest();
var entry = new rmq::Message
{
@@ -176,7 +174,7 @@ namespace Org.Apache.Rocketmq
}
List<string> targets = new List<string>();
- List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
+ List<rmq::MessageQueue> candidates = publishLb.Select(message.MessageGroup, message.MaxAttemptTimes);
foreach (var messageQueue in candidates)
{
targets.Add(Utilities.TargetUrl(messageQueue));
@@ -205,6 +203,10 @@ namespace Org.Apache.Rocketmq
return new SendReceipt(messageId);
}
+ else if (response?.Status is not null)
+ {
+ Logger.Warn($"Send failed with code: {response.Status.Code}, error: {response.Status.Message}");
+ }
}
catch (Exception e)
{
diff --git a/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
index 7d258b4f..6670beb6 100644
--- a/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -85,7 +85,7 @@ namespace Org.Apache.Rocketmq
return true;
}
- public List<rmq::MessageQueue> Select(int maxAttemptTimes)
+ public List<rmq::MessageQueue> Select(string messageGroup, int maxAttemptTimes)
{
List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
@@ -94,6 +94,13 @@ namespace Org.Apache.Rocketmq
{
return result;
}
+
+ if (!string.IsNullOrEmpty(messageGroup))
+ {
+ result.Add(all[messageGroup.GetHashCode() % all.Count]);
+ return result;
+ }
+
int start = ++_roundRobinIndex;
int found = 0;