You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:48 UTC

[rocketmq-clients] 08/28: Bugfix: ICollection is read-only

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 8f7418ed80b62ac163b20ea33aaa0058fa9279fe
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 13 17:27:23 2023 +0800

    Bugfix: ICollection is read-only
---
 csharp/examples/ProducerBenchmark.cs            | 32 +++++++++++++++++++++++++
 csharp/rocketmq-client-csharp/Client.cs         |  7 ++++--
 csharp/rocketmq-client-csharp/IClient.cs        |  2 +-
 csharp/rocketmq-client-csharp/Producer.cs       | 19 +++++++--------
 csharp/rocketmq-client-csharp/Session.cs        |  5 ++--
 csharp/rocketmq-client-csharp/SimpleConsumer.cs | 27 ++++++++++++++-------
 6 files changed, 67 insertions(+), 25 deletions(-)

diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
new file mode 100644
index 00000000..361aa95d
--- /dev/null
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading.Tasks;
+using NLog;
+using Org.Apache.Rocketmq;
+
+namespace examples
+{
+    public class ProducerBenchmark
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+        internal static async Task QuickStart()
+        {
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 3b260022..133fed00 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -333,7 +333,7 @@ namespace Org.Apache.Rocketmq
             return _telemetryCts;
         }
 
-        public abstract Proto.Settings GetSettings();
+        public abstract Settings GetSettings();
 
         public string GetClientId()
         {
@@ -358,6 +358,9 @@ namespace Org.Apache.Rocketmq
         {
         }
 
-        public abstract void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings);
+        public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
+        {
+            GetSettings().Sync(settings);
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs
index 5ba4c6f1..fc4c0127 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -27,7 +27,7 @@ namespace Org.Apache.Rocketmq
 
         ClientConfig GetClientConfig();
 
-        Proto.Settings GetSettings();
+        Settings GetSettings();
 
         /// <summary>
         /// Get the identifier of current client. 
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 62387a98..6a9040ec 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -30,6 +30,7 @@ namespace Org.Apache.Rocketmq
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
         private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
         private readonly PublishingSettings _publishingSettings;
+        private readonly ConcurrentDictionary<string, bool> _publishingTopics;
 
 
         public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3)
@@ -41,20 +42,21 @@ namespace Org.Apache.Rocketmq
         {
         }
 
-        private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
-            base(clientConfig, topics.Keys)
+        private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics, int maxAttempts) :
+            base(clientConfig, publishingTopics.Keys)
         {
             var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
             _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
-                clientConfig.RequestTimeout, topics);
+                clientConfig.RequestTimeout, publishingTopics);
             _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
+            _publishingTopics = publishingTopics;
         }
 
         public void SetTopics(params string[] topics)
         {
             foreach (var topic in topics)
             {
-                Topics.Add(topic);
+                _publishingTopics.TryAdd(topic, true);
             }
         }
 
@@ -183,14 +185,9 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public override Proto.Settings GetSettings()
+        public override Settings GetSettings()
         {
-            return _publishingSettings.ToProtobuf();
-        }
-
-        public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
-        {
-            _publishingSettings.Sync(settings);
+            return _publishingSettings;
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index 99d61268..dd3da7bd 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 
-using System;
 using System.Threading;
 using System.Threading.Channels;
 using System.Threading.Tasks;
@@ -63,9 +62,9 @@ namespace Org.Apache.Rocketmq
                 var writer = _streamingCall.RequestStream;
                 // await readTask;
                 var settings = _client.GetSettings();
-                Proto.TelemetryCommand telemetryCommand = new Proto.TelemetryCommand
+                var telemetryCommand = new Proto.TelemetryCommand
                 {
-                    Settings = settings
+                    Settings = settings.ToProtobuf()
                 };
                 await writer.WriteAsync(telemetryCommand);
                 // await writer.CompleteAsync();
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 1a0f0ec2..cb380d89 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
@@ -77,17 +94,11 @@ namespace Org.Apache.Rocketmq
             _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
         }
 
-        public override Proto.Settings GetSettings()
+        public override Settings GetSettings()
         {
-            return _simpleSubscriptionSettings.ToProtobuf();
+            return _simpleSubscriptionSettings;
         }
 
-        public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
-        {
-            _simpleSubscriptionSettings.Sync(settings);
-        }
-
-
         private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic)
         {
             if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer))