You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/12/23 09:25:03 UTC
[rocketmq-clients] 01/04: Add missing unsubscribe in dotnet SimpleConsumer
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 8f4c0c4d99decaabcc6f69941a8d5960dddb1595
Author: colprog <co...@gmail.com>
AuthorDate: Tue Dec 20 23:12:49 2022 +0800
Add missing unsubscribe in dotnet SimpleConsumer
---
csharp/rocketmq-client-csharp/Client.cs | 15 ++++++++-------
csharp/rocketmq-client-csharp/Producer.cs | 4 ++--
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 6 ++++++
3 files changed, 16 insertions(+), 9 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 217479d9..bd9fdef1 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -148,11 +148,7 @@ namespace Org.Apache.Rocketmq
private async Task UpdateTopicRoute()
{
- HashSet<string> topics = new HashSet<string>();
- foreach (var topic in _topicsOfInterest)
- {
- topics.Add(topic);
- }
+ HashSet<string> topics = new HashSet<string>(_topicsOfInterest.Keys);
foreach (var item in _topicRouteTable)
{
@@ -518,11 +514,16 @@ namespace Org.Apache.Rocketmq
protected readonly IClientManager Manager;
- protected readonly HashSet<string> _topicsOfInterest = new HashSet<string>();
+ protected readonly ConcurrentDictionary<string, bool> _topicsOfInterest = new ();
public void AddTopicOfInterest(string topic)
{
- _topicsOfInterest.Add(topic);
+ _topicsOfInterest.TryAdd(topic, true);
+ }
+
+ public void RemoveTopicOfInterest(string topic)
+ {
+ _topicsOfInterest.TryRemove(topic, out var _);
}
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 77016571..39e39b8e 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -94,7 +94,7 @@ namespace Org.Apache.Rocketmq
{
var resource = new rmq.Resource()
{
- Name = topic,
+ Name = topic.Key,
ResourceNamespace = ResourceNamespace
};
publishing.Topics.Add(resource);
@@ -105,7 +105,7 @@ namespace Org.Apache.Rocketmq
public async Task<SendReceipt> Send(Message message)
{
- _topicsOfInterest.Add(message.Topic);
+ _topicsOfInterest.TryAdd(message.Topic, true);
if (!_loadBalancer.TryGetValue(message.Topic, out var publishLb))
{
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 12e5d8cf..efe38211 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -180,6 +180,12 @@ namespace Org.Apache.Rocketmq
AddTopicOfInterest(topic);
}
+ public void Unsubscribe(string topic)
+ {
+ _subscriptions.TryRemove(topic, out var _);
+ RemoveTopicOfInterest(topic);
+ }
+
internal override void OnSettingsReceived(rmq.Settings settings)
{
base.OnSettingsReceived(settings);