You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:48 UTC
[rocketmq-clients] 08/28: Bugfix: ICollection is read-only
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 8f7418ed80b62ac163b20ea33aaa0058fa9279fe
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 13 17:27:23 2023 +0800
Bugfix: ICollection is read-only
---
csharp/examples/ProducerBenchmark.cs | 32 +++++++++++++++++++++++++
csharp/rocketmq-client-csharp/Client.cs | 7 ++++--
csharp/rocketmq-client-csharp/IClient.cs | 2 +-
csharp/rocketmq-client-csharp/Producer.cs | 19 +++++++--------
csharp/rocketmq-client-csharp/Session.cs | 5 ++--
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 27 ++++++++++++++-------
6 files changed, 67 insertions(+), 25 deletions(-)
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
new file mode 100644
index 00000000..361aa95d
--- /dev/null
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading.Tasks;
+using NLog;
+using Org.Apache.Rocketmq;
+
+namespace examples
+{
+ public class ProducerBenchmark
+ {
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+ internal static async Task QuickStart()
+ {
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 3b260022..133fed00 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -333,7 +333,7 @@ namespace Org.Apache.Rocketmq
return _telemetryCts;
}
- public abstract Proto.Settings GetSettings();
+ public abstract Settings GetSettings();
public string GetClientId()
{
@@ -358,6 +358,9 @@ namespace Org.Apache.Rocketmq
{
}
- public abstract void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings);
+ public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
+ {
+ GetSettings().Sync(settings);
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs
index 5ba4c6f1..fc4c0127 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -27,7 +27,7 @@ namespace Org.Apache.Rocketmq
ClientConfig GetClientConfig();
- Proto.Settings GetSettings();
+ Settings GetSettings();
/// <summary>
/// Get the identifier of current client.
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 62387a98..6a9040ec 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -30,6 +30,7 @@ namespace Org.Apache.Rocketmq
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
private readonly PublishingSettings _publishingSettings;
+ private readonly ConcurrentDictionary<string, bool> _publishingTopics;
public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3)
@@ -41,20 +42,21 @@ namespace Org.Apache.Rocketmq
{
}
- private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
- base(clientConfig, topics.Keys)
+ private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics, int maxAttempts) :
+ base(clientConfig, publishingTopics.Keys)
{
var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
_publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
- clientConfig.RequestTimeout, topics);
+ clientConfig.RequestTimeout, publishingTopics);
_publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
+ _publishingTopics = publishingTopics;
}
public void SetTopics(params string[] topics)
{
foreach (var topic in topics)
{
- Topics.Add(topic);
+ _publishingTopics.TryAdd(topic, true);
}
}
@@ -183,14 +185,9 @@ namespace Org.Apache.Rocketmq
}
}
- public override Proto.Settings GetSettings()
+ public override Settings GetSettings()
{
- return _publishingSettings.ToProtobuf();
- }
-
- public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
- {
- _publishingSettings.Sync(settings);
+ return _publishingSettings;
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index 99d61268..dd3da7bd 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
@@ -63,9 +62,9 @@ namespace Org.Apache.Rocketmq
var writer = _streamingCall.RequestStream;
// await readTask;
var settings = _client.GetSettings();
- Proto.TelemetryCommand telemetryCommand = new Proto.TelemetryCommand
+ var telemetryCommand = new Proto.TelemetryCommand
{
- Settings = settings
+ Settings = settings.ToProtobuf()
};
await writer.WriteAsync(telemetryCommand);
// await writer.CompleteAsync();
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 1a0f0ec2..cb380d89 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -77,17 +94,11 @@ namespace Org.Apache.Rocketmq
_subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
}
- public override Proto.Settings GetSettings()
+ public override Settings GetSettings()
{
- return _simpleSubscriptionSettings.ToProtobuf();
+ return _simpleSubscriptionSettings;
}
- public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
- {
- _simpleSubscriptionSettings.Sync(settings);
- }
-
-
private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic)
{
if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer))