You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/23 09:25:03 UTC

[rocketmq-clients] 01/04: Add missing unsubscribe in dotnet SimpleConsumer

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 8f4c0c4d99decaabcc6f69941a8d5960dddb1595
Author: colprog <co...@gmail.com>
AuthorDate: Tue Dec 20 23:12:49 2022 +0800

    Add missing unsubscribe in dotnet SimpleConsumer
---
 csharp/rocketmq-client-csharp/Client.cs         | 15 ++++++++-------
 csharp/rocketmq-client-csharp/Producer.cs       |  4 ++--
 csharp/rocketmq-client-csharp/SimpleConsumer.cs |  6 ++++++
 3 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 217479d9..bd9fdef1 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -148,11 +148,7 @@ namespace Org.Apache.Rocketmq
 
         private async Task UpdateTopicRoute()
         {
-            HashSet<string> topics = new HashSet<string>();
-            foreach (var topic in _topicsOfInterest)
-            {
-                topics.Add(topic);
-            }
+            HashSet<string> topics = new HashSet<string>(_topicsOfInterest.Keys);
 
             foreach (var item in _topicRouteTable)
             {
@@ -518,11 +514,16 @@ namespace Org.Apache.Rocketmq
 
         protected readonly IClientManager Manager;
 
-        protected readonly HashSet<string> _topicsOfInterest = new HashSet<string>();
+        protected readonly ConcurrentDictionary<string, bool> _topicsOfInterest = new ();
 
         public void AddTopicOfInterest(string topic)
         {
-            _topicsOfInterest.Add(topic);
+            _topicsOfInterest.TryAdd(topic, true);
+        }
+
+        public void RemoveTopicOfInterest(string topic)
+        {
+            _topicsOfInterest.TryRemove(topic, out var _);
         }
 
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 77016571..39e39b8e 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -94,7 +94,7 @@ namespace Org.Apache.Rocketmq
             {
                 var resource = new rmq.Resource()
                 {
-                    Name = topic,
+                    Name = topic.Key,
                     ResourceNamespace = ResourceNamespace
                 };
                 publishing.Topics.Add(resource);
@@ -105,7 +105,7 @@ namespace Org.Apache.Rocketmq
 
         public async Task<SendReceipt> Send(Message message)
         {
-            _topicsOfInterest.Add(message.Topic);
+            _topicsOfInterest.TryAdd(message.Topic, true);
 
             if (!_loadBalancer.TryGetValue(message.Topic, out var publishLb))
             {
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 12e5d8cf..efe38211 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -180,6 +180,12 @@ namespace Org.Apache.Rocketmq
             AddTopicOfInterest(topic);
         }
 
+        public void Unsubscribe(string topic)
+        {
+            _subscriptions.TryRemove(topic, out var _);
+            RemoveTopicOfInterest(topic);
+        }
+
         internal override void OnSettingsReceived(rmq.Settings settings)
         {
             base.OnSettingsReceived(settings);