You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/27 14:05:11 UTC

[rocketmq-clients] branch master updated (423c8248 -> 12ef8269)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


    from 423c8248 Update csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
     new d957e524 Fix SimpleConsumer with multiple topics
     new cd456333 Align dotnet Producer on BornTimestamp/QueueId with other languages
     new 12ef8269 dotnet: allow changing SimpleConsumer polling time

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 csharp/rocketmq-client-csharp/Producer.cs          | 48 ++++++++--------------
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    | 36 ++++++----------
 .../SubscriptionLoadBalancer.cs}                   | 41 +++++++++---------
 3 files changed, 52 insertions(+), 73 deletions(-)
 copy csharp/{tests/SequenceGeneratorTest.cs => rocketmq-client-csharp/SubscriptionLoadBalancer.cs} (52%)


[rocketmq-clients] 03/03: dotnet: allow changing SimpleConsumer polling time

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 12ef82697b098ad61a7c17da1fdd3db020c808a6
Author: colprog <co...@gmail.com>
AuthorDate: Sun Dec 25 04:06:46 2022 +0800

    dotnet: allow changing SimpleConsumer polling time
---
 csharp/rocketmq-client-csharp/SimpleConsumer.cs | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 5dd3a03b..ff92044f 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -196,7 +196,7 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public async Task<List<Message>> Receive(int batchSize, TimeSpan invisibleDuration)
+        public async Task<List<Message>> Receive(int batchSize, TimeSpan invisibleDuration, TimeSpan? awaitDuration = null)
         {
             var messageQueue = NextQueue();
             if (null == messageQueue)
@@ -225,8 +225,7 @@ namespace Org.Apache.Rocketmq
             var metadata = new Metadata();
             Signature.Sign(this, metadata);
 
-            var timeout = ClientSettings.Subscription.LongPollingTimeout
-                .ToTimeSpan()
+            var timeout = (awaitDuration ?? ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan())
                 .Add(this.RequestTimeout);
 
             return await Manager.ReceiveMessage(targetUrl, metadata, request, timeout);


[rocketmq-clients] 01/03: Fix SimpleConsumer with multiple topics

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit d957e524325ac8164ed3b25e100ee099a526d33f
Author: colprog <co...@gmail.com>
AuthorDate: Fri Dec 23 02:02:28 2022 +0800

    Fix SimpleConsumer with multiple topics
    
    loadbalancing code in SimpleConsumer is faulty. with two topics and
    the same number of queues. because topic index and queue index is
    incremented simutaneusly, some queue will never be polled.
    Also implemented a simple delay if NextQueue returns null to avoid
    potential starvation
---
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    | 31 +++++--------
 .../SubscriptionLoadBalancer.cs                    | 52 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 20 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index efe38211..5dd3a03b 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -33,8 +33,8 @@ namespace Org.Apache.Rocketmq
         public SimpleConsumer(string accessUrl, string group)
         : base(accessUrl)
         {
-            _subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
-            _topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
+            _subscriptions = new();
+            _topicAssignments = new();
             _group = group;
         }
 
@@ -133,16 +133,16 @@ namespace Org.Apache.Rocketmq
             var i = 0;
             foreach (var assignments in list)
             {
-                string topic = topics[i];
+                string topic = topics[i++];
                 if (null == assignments || 0 == assignments.Count)
                 {
                     Logger.Warn($"Faild to acquire assignments. Topic={topic}, Group={_group}");
-                    ++i;
                     continue;
                 }
+
                 Logger.Debug($"Assignments received. Topic={topic}, Group={_group}");
-                _topicAssignments.AddOrUpdate(topic, assignments, (t, prev) => assignments);
-                ++i;
+                var newSubscriptionLB = new SubscriptionLoadBalancer(assignments);
+                _topicAssignments.AddOrUpdate(topic, newSubscriptionLB, (t, prev) => prev.Update(assignments));
             }
         }
 
@@ -201,8 +201,7 @@ namespace Org.Apache.Rocketmq
             var messageQueue = NextQueue();
             if (null == messageQueue)
             {
-                Logger.Debug("NextQueue returned null");
-                return new List<Message>();
+                throw new TopicRouteException("No topic to receive message from");
             }
 
             var request = new rmq.ReceiveMessageRequest
@@ -299,31 +298,23 @@ namespace Org.Apache.Rocketmq
             var total = _topicAssignments.Count;
             var topicIndex = topicSeq % total;
             var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
-            
-            UInt32 queueSeq = _currentQueueSequence.Value;
-            _currentQueueSequence.Value = queueSeq + 1;
-            if (!_topicAssignments.TryGetValue(topic, out var assignments))
+
+            if (!_topicAssignments.TryGetValue(topic, out var subscriptionLB))
             {
                 return null;
             }
 
-            var idx = queueSeq % assignments?.Count;
-            return assignments?[(int)idx].MessageQueue;
+            return subscriptionLB.TakeMessageQueue();
         }
 
         private readonly ThreadLocal<UInt32> _currentTopicSequence = new ThreadLocal<UInt32>(true)
         {
             Value = 0
         };
-        
-        private readonly ThreadLocal<UInt32> _currentQueueSequence = new ThreadLocal<UInt32>(true)
-        {
-            Value = 0
-        };
 
         private readonly string _group;
         private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
-        private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
+        private readonly ConcurrentDictionary<string, SubscriptionLoadBalancer> _topicAssignments;
         private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
new file mode 100644
index 00000000..cf803377
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using rmq = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    internal sealed class SubscriptionLoadBalancer
+    {
+        public List<rmq.Assignment> Assignments { get; private set; }
+        private uint index = 0;
+
+        public SubscriptionLoadBalancer(List<rmq.Assignment> assignments)
+        {
+            Assignments = assignments;
+        }
+
+        private SubscriptionLoadBalancer(uint oldIndex, List<rmq.Assignment> assignments)
+        {
+            index = oldIndex;
+            Assignments = assignments;
+        }
+
+        public SubscriptionLoadBalancer Update(List<rmq.Assignment> newAssignments)
+        {
+            return new SubscriptionLoadBalancer(index, newAssignments);
+        }
+
+        public rmq.MessageQueue TakeMessageQueue()
+        {
+            var i = Interlocked.Increment(ref index);
+            return Assignments[(int)(i % Assignments.Count)].MessageQueue;
+        }
+    }
+}


[rocketmq-clients] 02/03: Align dotnet Producer on BornTimestamp/QueueId with other languages

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit cd456333b960b6ee4d74572b9959b4ba673f9bb6
Author: colprog <co...@gmail.com>
AuthorDate: Fri Dec 23 03:26:34 2022 +0800

    Align dotnet Producer on BornTimestamp/QueueId with other languages
---
 csharp/rocketmq-client-csharp/Producer.cs | 48 +++++++++++--------------------
 1 file changed, 17 insertions(+), 31 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 39e39b8e..69a4b118 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -128,22 +128,17 @@ namespace Org.Apache.Rocketmq
                 {
                     ResourceNamespace = resourceNamespace(),
                     Name = message.Topic
-                }
+                },
+                UserProperties = { message.UserProperties },
+                SystemProperties = new rmq::SystemProperties
+                {
+                    MessageId = message.MessageId,
+                    MessageType = rmq::MessageType.Normal,
+                    Keys = { message.Keys },
+                },
             };
             request.Messages.Add(entry);
 
-            // User properties
-            foreach (var item in message.UserProperties)
-            {
-                entry.UserProperties.Add(item.Key, item.Value);
-            }
-
-            entry.SystemProperties = new rmq::SystemProperties
-            {
-                MessageId = message.MessageId,
-                MessageType = rmq::MessageType.Normal
-            };
-            
             if (DateTime.MinValue != message.DeliveryTimestamp)
             {
                 entry.SystemProperties.MessageType = rmq::MessageType.Delay;
@@ -154,39 +149,30 @@ namespace Org.Apache.Rocketmq
                     Logger.Warn("A message may not be FIFO and delayed at the same time");
                     throw new MessageException("A message may not be both FIFO and Timed");
                 }
-            } else if (!String.IsNullOrEmpty(message.MessageGroup))
+            }
+            else if (!String.IsNullOrEmpty(message.MessageGroup))
             {
                 entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
                 entry.SystemProperties.MessageGroup = message.MessageGroup;
             }
-            
+
             if (!string.IsNullOrEmpty(message.Tag))
             {
                 entry.SystemProperties.Tag = message.Tag;
             }
 
-            if (0 != message.Keys.Count)
-            {
-                foreach (var key in message.Keys)
-                {
-                    entry.SystemProperties.Keys.Add(key);
-                }
-            }
-
-            List<string> targets = new List<string>();
-            List<rmq::MessageQueue> candidates = publishLb.Select(message.MessageGroup, message.MaxAttemptTimes);
-            foreach (var messageQueue in candidates)
-            {
-                targets.Add(Utilities.TargetUrl(messageQueue));
-            }
-
             var metadata = new Metadata();
             Signature.Sign(this, metadata);
 
             Exception ex = null;
 
-            foreach (var target in targets)
+            var candidates = publishLb.Select(message.MessageGroup, message.MaxAttemptTimes);
+            foreach (var messageQueue in candidates)
             {
+                var target = Utilities.TargetUrl(messageQueue);
+                entry.SystemProperties.BornTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow);
+                entry.SystemProperties.QueueId = messageQueue.Id;
+
                 try
                 {
                     var stopWatch = new Stopwatch();