You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/28 03:20:34 UTC
[rocketmq-clients] 02/02: Apply builder pattern
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit dd411737e1503b090db9d7b64e271a10ae2e690c
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Feb 28 10:10:35 2023 +0800
Apply builder pattern
---
csharp/examples/ProducerBenchmark.cs | 80 +++++++++---------
csharp/examples/ProducerDelayMessageExample.cs | 36 ++++----
csharp/examples/ProducerFifoMessageExample.cs | 37 ++++----
csharp/examples/ProducerNormalMessageExample.cs | 37 ++++----
.../examples/ProducerTransactionMessageExample.cs | 32 +++----
csharp/examples/QuickStart.cs | 2 +-
csharp/examples/SimpleConsumerExample.cs | 18 ++--
csharp/rocketmq-client-csharp/ClientConfig.cs | 40 +++++++--
csharp/rocketmq-client-csharp/Message.cs | 93 ++++++++++++++------
.../{ClientConfig.cs => Preconditions.cs} | 17 ++--
csharp/rocketmq-client-csharp/Producer.cs | 83 ++++++++++++------
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 51 ++++++++++-
csharp/tests/MessageTest.cs | 98 ++--------------------
13 files changed, 348 insertions(+), 276 deletions(-)
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
index a114f879..dc5d372f 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -30,51 +30,16 @@ namespace examples
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private static readonly SemaphoreSlim Semaphore = new(0);
+ private const int TpsLimit = 1;
private static long _counter = 0;
- internal static void QuickStart()
+ private static void DoStats()
{
- const string accessKey = "amKhwEM40L61znSz";
- const string secretKey = "bT6c3gpF3EFB10F3";
-
- // Credential provider is optional for client configuration.
- var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- const string endpoints = "rmq-cn-nwy337bf81g.cn-hangzhou.rmq.aliyuncs.com:8080";
- var clientConfig = new ClientConfig(endpoints)
- {
- CredentialsProvider = credentialsProvider
- };
- // In most case, you don't need to create too many producers, single pattern is recommended.
- var producer = new Producer(clientConfig);
-
- const string topic = "lingchu_normal_topic";
- producer.SetTopics(topic);
- // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
- // the topic route before message publishing.
- producer.Start().Wait();
- // Define your message body.
- var bytes = Encoding.UTF8.GetBytes("foobar");
- const string tag = "yourMessageTagA";
- // You could set multiple keys for the single message.
- var keys = new List<string>
- {
- "yourMessageKey-7044358f98fc",
- "yourMessageKey-f72539fbc246"
- };
- // Set topic for current message.
- var message = new Message(topic, bytes)
- {
- Tag = tag,
- Keys = keys
- };
-
- const int tpsLimit = 1;
-
Task.Run(async () =>
{
while (true)
{
- Semaphore.Release(tpsLimit);
+ Semaphore.Release(TpsLimit);
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
@@ -87,7 +52,46 @@ namespace examples
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
+ }
+
+ internal static async Task QuickStart()
+ {
+ const string accessKey = "yourAccessKey";
+ const string secretKey = "yourSecretKey";
+
+ // Credential provider is optional for client configuration.
+ var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
+ const string endpoints = "foobar.com:8080";
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+ const string topic = "yourNormalTopic";
+ // In most case, you don't need to create too many producers, single pattern is recommended.
+ await using var producer = await new Producer.Builder()
+ // Set the topic name(s), which is optional but recommended.
+ // It makes producer could prefetch the topic route before message publishing.
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ // Define your message body.
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ const string tag = "yourMessageTagA";
+ // You could set multiple keys for the single message.
+ var keys = new List<string>
+ {
+ "yourMessageKey-7044358f98fc",
+ "yourMessageKey-f72539fbc246"
+ };
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys(keys)
+ .Build();
+ DoStats();
var tasks = new List<Task>();
while (true)
{
diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs
index edb356a5..31a40be7 100644
--- a/csharp/examples/ProducerDelayMessageExample.cs
+++ b/csharp/examples/ProducerDelayMessageExample.cs
@@ -35,18 +35,18 @@ namespace examples
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
- var clientConfig = new ClientConfig(endpoints)
- {
- CredentialsProvider = credentialsProvider
- };
- // In most case, you don't need to create too many producers, single pattern is recommended.
- var producer = new Producer(clientConfig);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
const string topic = "yourDelayTopic";
- // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
- // the topic route before message publishing.
- producer.SetTopics(topic);
-
- await producer.Start();
+ // In most case, you don't need to create too many producers, single pattern is recommended.
+ await using var producer = await new Producer.Builder()
+ // Set the topic name(s), which is optional but recommended.
+ // It makes producer could prefetch the topic route before message publishing.
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
// Define your message body.
var bytes = Encoding.UTF8.GetBytes("foobar");
const string tag = "yourMessageTagA";
@@ -56,14 +56,12 @@ namespace examples
"yourMessageKey-2f00df144e48",
"yourMessageKey-49df1dd332b7"
};
- // Set topic for current message.
- var message = new Message(topic, bytes)
- {
- Tag = tag,
- Keys = keys,
- // Essential for DELAY message.
- DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30)
- };
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys(keys)
+ .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(30)).Build();
var sendReceipt = await producer.Send(message);
Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
// Close the producer if you don't need it anymore.
diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs
index bfca32f5..9a1bdf76 100644
--- a/csharp/examples/ProducerFifoMessageExample.cs
+++ b/csharp/examples/ProducerFifoMessageExample.cs
@@ -36,18 +36,18 @@ namespace examples
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
- var clientConfig = new ClientConfig(endpoints)
- {
- CredentialsProvider = credentialsProvider
- };
- // In most case, you don't need to create too many producers, single pattern is recommended.
- var producer = new Producer(clientConfig);
-
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
const string topic = "yourFifoTopic";
- producer.SetTopics(topic);
- // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
- // the topic route before message publishing.
- await producer.Start();
+ // In most case, you don't need to create too many producers, single pattern is recommended.
+ await using var producer = await new Producer.Builder()
+ // Set the topic name(s), which is optional but recommended.
+ // It makes producer could prefetch the topic route before message publishing.
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
// Define your message body.
var bytes = Encoding.UTF8.GetBytes("foobar");
const string tag = "yourMessageTagA";
@@ -58,14 +58,13 @@ namespace examples
"yourMessageKey-f72539fbc246"
};
const string messageGroup = "yourMessageGroup";
- // Set topic for current message.
- var message = new Message(topic, bytes)
- {
- Tag = tag,
- Keys = keys,
- // Set message group for FIFO message.
- MessageGroup = messageGroup
- };
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys(keys)
+ .SetMessageGroup(messageGroup)
+ .Build();
var sendReceipt = await producer.Send(message);
Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
Thread.Sleep(9999999);
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index 09e4dff1..258886e2 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -17,7 +17,6 @@
using System.Collections.Generic;
using System.Text;
-using System.Threading;
using System.Threading.Tasks;
using NLog;
using Org.Apache.Rocketmq;
@@ -36,18 +35,18 @@ namespace examples
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
- var clientConfig = new ClientConfig(endpoints)
- {
- CredentialsProvider = credentialsProvider
- };
- // In most case, you don't need to create too many producers, single pattern is recommended.
- var producer = new Producer(clientConfig);
-
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
const string topic = "yourNormalTopic";
- producer.SetTopics(topic);
- // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
- // the topic route before message publishing.
- await producer.Start();
+ // In most case, you don't need to create too many producers, single pattern is recommended.
+ await using var producer = await new Producer.Builder()
+ // Set the topic name(s), which is optional but recommended.
+ // It makes producer could prefetch the topic route before message publishing.
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
// Define your message body.
var bytes = Encoding.UTF8.GetBytes("foobar");
const string tag = "yourMessageTagA";
@@ -58,16 +57,14 @@ namespace examples
"yourMessageKey-f72539fbc246"
};
// Set topic for current message.
- var message = new Message(topic, bytes)
- {
- Tag = tag,
- Keys = keys
- };
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys(keys)
+ .Build();
var sendReceipt = await producer.Send(message);
Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
- Thread.Sleep(9999999);
- // Close the producer if you don't need it anymore.
- await producer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs
index 10b61142..06cb3995 100644
--- a/csharp/examples/ProducerTransactionMessageExample.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -42,16 +42,20 @@ namespace examples
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
- var clientConfig = new ClientConfig(endpoints)
- {
- CredentialsProvider = credentialsProvider
- };
- // In most case, you don't need to create too many producers, single pattern is recommended.
- var producer = new Producer(clientConfig);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
const string topic = "yourTransactionTopic";
- producer.SetTopics(topic);
- producer.SetTransactionChecker(new TransactionChecker());
+ // In most case, you don't need to create too many producers, single pattern is recommended.
+ await using var producer = await new Producer.Builder()
+ // Set the topic name(s), which is optional but recommended.
+ // It makes producer could prefetch the topic route before message publishing.
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .SetTransactionChecker(new TransactionChecker())
+ .Build();
await producer.Start();
var transaction = producer.BeginTransaction();
@@ -64,12 +68,12 @@ namespace examples
"yourMessageKey-7044358f98fc",
"yourMessageKey-f72539fbc246"
};
- // Set topic for current message.
- var message = new Message(topic, bytes)
- {
- Tag = tag,
- Keys = keys
- };
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys(keys)
+ .Build();
var sendReceipt = await producer.Send(message, transaction);
Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
// Commit the transaction.
diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs
index 8323218f..c1915035 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/examples/QuickStart.cs
@@ -31,7 +31,7 @@ namespace examples
// await ProducerFifoMessageExample.QuickStart();
// await ProducerDelayMessageExample.QuickStart();
// await SimpleConsumerExample.QuickStart();
- ProducerBenchmark.QuickStart();
+ ProducerBenchmark.QuickStart().Wait();
}
}
}
\ No newline at end of file
diff --git a/csharp/examples/SimpleConsumerExample.cs b/csharp/examples/SimpleConsumerExample.cs
index fa78e845..c153c544 100644
--- a/csharp/examples/SimpleConsumerExample.cs
+++ b/csharp/examples/SimpleConsumerExample.cs
@@ -35,18 +35,21 @@ namespace examples
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
- var clientConfig = new ClientConfig(endpoints)
- {
- CredentialsProvider = credentialsProvider
- };
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
// Add your subscriptions.
const string consumerGroup = "yourConsumerGroup";
const string topic = "yourTopic";
var subscription = new Dictionary<string, FilterExpression>
{ { topic, new FilterExpression("*") } };
// In most case, you don't need to create too many consumers, single pattern is recommended.
- var simpleConsumer =
- new SimpleConsumer(clientConfig, consumerGroup, TimeSpan.FromSeconds(15), subscription);
+ await using var simpleConsumer = new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig).SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
await simpleConsumer.Start();
var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
@@ -58,9 +61,6 @@ namespace examples
// await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(15));
// Logger.Info($"Changing message invisible duration successfully, message=id={message.MessageId}");
}
-
- // Close the consumer if you don't need it anymore.
- await simpleConsumer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs
index 82ee8403..f6fa4de0 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -16,22 +16,52 @@
*/
using System;
-using System.Threading;
namespace Org.Apache.Rocketmq
{
public class ClientConfig : IClientConfig
{
- public ClientConfig(string endpoints)
+ private ClientConfig(ICredentialsProvider credentialsProvider, TimeSpan requestTimeout, string endpoints)
{
- RequestTimeout = TimeSpan.FromSeconds(3);
+ CredentialsProvider = credentialsProvider;
+ RequestTimeout = requestTimeout;
Endpoints = endpoints;
}
- public ICredentialsProvider CredentialsProvider { get; set; }
+ public ICredentialsProvider CredentialsProvider { get; }
- public TimeSpan RequestTimeout { get; set; }
+ public TimeSpan RequestTimeout { get; }
public string Endpoints { get; }
+
+ public class Builder
+ {
+ private ICredentialsProvider _credentialsProvider;
+ private TimeSpan _requestTimeout = TimeSpan.FromSeconds(3);
+ private string _endpoints;
+
+ public Builder SetCredentialsProvider(ICredentialsProvider credentialsProvider)
+ {
+ _credentialsProvider = credentialsProvider;
+ return this;
+ }
+
+ public Builder SetRequestTimeout(TimeSpan requestTimeout)
+ {
+ _requestTimeout = requestTimeout;
+ return this;
+ }
+
+ public Builder SetEndpoints(string endpoints)
+ {
+ _endpoints = endpoints;
+ return this;
+ }
+
+ public ClientConfig Build()
+ {
+ return new ClientConfig(_credentialsProvider, _requestTimeout, _endpoints);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Message.cs b/csharp/rocketmq-client-csharp/Message.cs
index 19e868fc..fe2dee79 100644
--- a/csharp/rocketmq-client-csharp/Message.cs
+++ b/csharp/rocketmq-client-csharp/Message.cs
@@ -22,26 +22,16 @@ namespace Org.Apache.Rocketmq
{
public class Message
{
- public Message() : this(null, null)
- {
- }
-
- public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body)
- {
- }
-
- public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body)
- {
- }
-
- public Message(string topic, string tag, List<string> keys, byte[] body)
+ private Message(string topic, byte[] body, string tag, List<string> keys,
+ Dictionary<string, string> userProperties, DateTime? deliveryTimestamp, string messageGroup)
{
Topic = topic;
Tag = tag;
Keys = keys;
Body = body;
- UserProperties = new Dictionary<string, string>();
- DeliveryTimestamp = null;
+ UserProperties = userProperties;
+ DeliveryTimestamp = deliveryTimestamp;
+ MessageGroup = messageGroup;
}
internal Message(Message message)
@@ -50,25 +40,80 @@ namespace Org.Apache.Rocketmq
Tag = message.Tag;
Keys = message.Keys;
Body = message.Body;
- MessageGroup = message.MessageGroup;
UserProperties = message.UserProperties;
+ MessageGroup = message.MessageGroup;
DeliveryTimestamp = message.DeliveryTimestamp;
}
- public string Topic { get; set; }
+ public string Topic { get; }
+
+ public byte[] Body { get; }
+
+ public string Tag { get; }
+
+ public List<string> Keys { get; }
+ public Dictionary<string, string> UserProperties { get; }
+
+ public DateTime? DeliveryTimestamp { get; }
- public byte[] Body { get; set; }
+ public string MessageGroup { get; }
- public string Tag { get; set; }
+ public class Builder
+ {
+ private string _topic;
+ private byte[] _body;
+ private string _tag;
+ private List<string> _keys = new();
+ private Dictionary<string, string> _userProperties = new();
+ private DateTime? _deliveryTimestamp;
+ private string _messageGroup;
+
+ public Builder SetTopic(string topic)
+ {
+ _topic = topic;
+ return this;
+ }
+
+ public Builder SetBody(byte[] body)
+ {
+ _body = body;
+ return this;
+ }
- public List<string> Keys { get; set; }
- public Dictionary<string, string> UserProperties { get; set; }
+ public Builder SetTag(string tag)
+ {
+ _tag = tag;
+ return this;
+ }
+ public Builder SetKeys(List<string> keys)
+ {
+ _keys = keys;
+ return this;
+ }
- public DateTime? DeliveryTimestamp { get; set; }
+ public Builder SetUserProperties(Dictionary<string, string> userProperties)
+ {
+ _userProperties = userProperties;
+ return this;
+ }
- public int DeliveryAttempt { get; internal set; }
+ public Builder SetDeliveryTimestamp(DateTime deliveryTimestamp)
+ {
+ _deliveryTimestamp = deliveryTimestamp;
+ return this;
+ }
- public string MessageGroup { get; set; }
+ public Builder SetMessageGroup(string messageGroup)
+ {
+ _messageGroup = messageGroup;
+ return this;
+ }
+
+ public Message Build()
+ {
+ return new Message(_topic, _body, _tag, _keys, _userProperties, _deliveryTimestamp, _messageGroup);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/Preconditions.cs
similarity index 70%
copy from csharp/rocketmq-client-csharp/ClientConfig.cs
copy to csharp/rocketmq-client-csharp/Preconditions.cs
index 82ee8403..235f81b6 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/Preconditions.cs
@@ -16,22 +16,17 @@
*/
using System;
-using System.Threading;
namespace Org.Apache.Rocketmq
{
- public class ClientConfig : IClientConfig
+ public static class Preconditions
{
- public ClientConfig(string endpoints)
+ public static void CheckArgument(bool condition, string message)
{
- RequestTimeout = TimeSpan.FromSeconds(3);
- Endpoints = endpoints;
+ if (!condition)
+ {
+ throw new ArgumentException(message);
+ }
}
-
- public ICredentialsProvider CredentialsProvider { get; set; }
-
- public TimeSpan RequestTimeout { get; set; }
-
- public string Endpoints { get; }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 81d2b8d7..c1e411a8 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -27,28 +27,18 @@ using NLog;
namespace Org.Apache.Rocketmq
{
- public class Producer : Client
+ public class Producer : Client, IAsyncDisposable, IDisposable
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
internal readonly PublishingSettings PublishingSettings;
private readonly ConcurrentDictionary<string, bool> _publishingTopics;
- private ITransactionChecker _checker = null;
+ private readonly ITransactionChecker _checker;
private readonly Histogram<double> _sendCostTimeHistogram;
- public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3)
- {
- }
-
- public Producer(ClientConfig clientConfig, int maxAttempts) : this(clientConfig,
- new ConcurrentDictionary<string, bool>(), maxAttempts)
- {
- }
-
private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics,
- int maxAttempts) :
- base(clientConfig)
+ int maxAttempts, ITransactionChecker checker) : base(clientConfig)
{
var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
PublishingSettings = new PublishingSettings(ClientId, Endpoints, retryPolicy,
@@ -57,18 +47,6 @@ namespace Org.Apache.Rocketmq
_publishingTopics = publishingTopics;
_sendCostTimeHistogram =
ClientMeterManager.Meter.CreateHistogram<double>(MetricConstant.SendCostTimeMetricName, "milliseconds");
- }
-
- public void SetTopics(params string[] topics)
- {
- foreach (var topic in topics)
- {
- _publishingTopics.TryAdd(topic, true);
- }
- }
-
- public void SetTransactionChecker(ITransactionChecker checker)
- {
_checker = checker;
}
@@ -94,6 +72,18 @@ namespace Org.Apache.Rocketmq
}
}
+ public async ValueTask DisposeAsync()
+ {
+ await Shutdown().ConfigureAwait(false);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose()
+ {
+ Shutdown().Wait();
+ GC.SuppressFinalize(this);
+ }
+
public override async Task Shutdown()
{
try
@@ -330,5 +320,48 @@ namespace Org.Apache.Rocketmq
var response = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout);
StatusChecker.Check(response.Status, request);
}
+
+ public class Builder
+ {
+ private ClientConfig _clientConfig;
+ private readonly ConcurrentDictionary<string, bool> _publishingTopics = new();
+ private int _maxAttempts = 3;
+ private ITransactionChecker _checker;
+
+ public Builder SetClientConfig(ClientConfig clientConfig)
+ {
+ _clientConfig = clientConfig;
+ return this;
+ }
+
+ public Builder SetTopics(params string[] topics)
+ {
+ foreach (var topic in topics)
+ {
+ _publishingTopics[topic] = true;
+ }
+
+ return this;
+ }
+
+ public Builder SetMaxAttempts(int maxAttempts)
+ {
+ _maxAttempts = maxAttempts;
+ return this;
+ }
+
+ public Builder SetTransactionChecker(ITransactionChecker checker)
+ {
+ _checker = checker;
+ return this;
+ }
+
+ public async Task<Producer> Build()
+ {
+ var producer = new Producer(_clientConfig, _publishingTopics, _maxAttempts, _checker);
+ await producer.Start();
+ return producer;
+ }
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index dc5b61c7..97fe407c 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -27,7 +27,7 @@ using Org.Apache.Rocketmq.Error;
namespace Org.Apache.Rocketmq
{
- public class SimpleConsumer : Consumer
+ public class SimpleConsumer : Consumer, IAsyncDisposable, IDisposable
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private readonly ConcurrentDictionary<string /* topic */, SubscriptionLoadBalancer> _subscriptionRouteDataCache;
@@ -91,6 +91,18 @@ namespace Org.Apache.Rocketmq
}
}
+ public async ValueTask DisposeAsync()
+ {
+ await Shutdown().ConfigureAwait(false);
+ GC.SuppressFinalize(this);
+ }
+
+ public void Dispose()
+ {
+ Shutdown().Wait();
+ GC.SuppressFinalize(this);
+ }
+
public override async Task Shutdown()
{
try
@@ -256,5 +268,42 @@ namespace Org.Apache.Rocketmq
Name = ConsumerGroup
};
}
+
+ public class Builder
+ {
+ private ClientConfig _clientConfig;
+ private string _consumerGroup;
+ private TimeSpan _awaitDuration;
+ private ConcurrentDictionary<string, FilterExpression> _subscriptionExpressions;
+
+ public Builder SetClientConfig(ClientConfig clientConfig)
+ {
+ _clientConfig = clientConfig;
+ return this;
+ }
+
+ public Builder SetConsumerGroup(string consumerGroup)
+ {
+ _consumerGroup = consumerGroup;
+ return this;
+ }
+
+ public Builder SetAwaitDuration(TimeSpan awaitDuration)
+ {
+ _awaitDuration = awaitDuration;
+ return this;
+ }
+
+ public Builder SetSubscriptionExpression(Dictionary<string, FilterExpression> subscriptionExpressions)
+ {
+ _subscriptionExpressions = new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions);
+ return this;
+ }
+
+ public SimpleConsumer Build()
+ {
+ return new SimpleConsumer(_clientConfig, _consumerGroup, _awaitDuration, _subscriptionExpressions);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/csharp/tests/MessageTest.cs b/csharp/tests/MessageTest.cs
index 9bb0f66c..6ba00b8e 100644
--- a/csharp/tests/MessageTest.cs
+++ b/csharp/tests/MessageTest.cs
@@ -14,106 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Microsoft.VisualStudio.TestTools.UnitTesting;
+
using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Text;
-using System.Collections.Generic;
namespace Org.Apache.Rocketmq
{
[TestClass]
public class MessageTest
{
-
- [TestMethod]
- public void testCtor()
- {
- var msg1 = new Message();
- Assert.IsNull(msg1.Topic);
- Assert.IsNull(msg1.Body);
- Assert.IsNull(msg1.Tag);
- Assert.AreEqual(msg1.Keys.Count, 0);
- Assert.AreEqual(msg1.UserProperties.Count, 0);
- }
-
- [TestMethod]
- public void testCtor2()
- {
- string topic = "T1";
- string bodyString = "body";
- byte[] body = Encoding.ASCII.GetBytes(bodyString);
- var msg1 = new Message(topic, body);
- Assert.AreEqual(msg1.Topic, topic);
- Assert.AreEqual(msg1.Body, body);
- Assert.IsNull(msg1.Tag);
- Assert.AreEqual(msg1.Keys.Count, 0);
- Assert.AreEqual(msg1.UserProperties.Count, 0);
- }
-
- [TestMethod]
- public void testCtor3()
- {
- string topic = "T1";
- string bodyString = "body";
- byte[] body = Encoding.ASCII.GetBytes(bodyString);
- string tag = "TagA";
- var msg1 = new Message(topic, tag, body);
- Assert.AreEqual(msg1.Topic, topic);
- Assert.AreEqual(msg1.Body, body);
- Assert.AreEqual(msg1.Tag, tag);
- Assert.AreEqual(msg1.Keys.Count, 0);
- Assert.AreEqual(msg1.UserProperties.Count, 0);
- }
-
- [TestMethod]
- public void testCtor4()
- {
- string topic = "T1";
- string bodyString = "body";
- byte[] body = Encoding.ASCII.GetBytes(bodyString);
- string tag = "TagA";
- List<string> keys = new List<string>();
- keys.Add("Key1");
- keys.Add("Key2");
-
- var msg1 = new Message(topic, tag, keys, body);
- Assert.AreEqual(msg1.Topic, topic);
- Assert.AreEqual(msg1.Body, body);
- Assert.AreEqual(msg1.Tag, tag);
- Assert.AreEqual(msg1.Keys, keys);
- Assert.AreEqual(msg1.UserProperties.Count, 0);
- }
-
[TestMethod]
- public void testCtor5()
+ [ExpectedException(typeof(ArgumentException))]
+ public void TestIllegalTopic0()
{
- string topic = "T1";
- string bodyString = "body";
- byte[] body = Encoding.ASCII.GetBytes(bodyString);
- string tag = "TagA";
- List<string> keys = new List<string>();
- keys.Add("Key1");
- keys.Add("Key2");
-
- var msg1 = new Message(topic, tag, keys, body);
-
- msg1.UserProperties.Add("k", "v");
- msg1.UserProperties.Add("k2", "v2");
-
- Assert.AreEqual(msg1.Topic, topic);
- Assert.AreEqual(msg1.Body, body);
- Assert.AreEqual(msg1.Tag, tag);
- Assert.AreEqual(msg1.Keys, keys);
- Assert.AreEqual(msg1.UserProperties.Count, 2);
-
- string value;
- msg1.UserProperties.TryGetValue("k", out value);
- Assert.AreEqual(value, "v");
-
- msg1.UserProperties.TryGetValue("k2", out value);
- Assert.AreEqual(value, "v2");
-
+ // const string topic = "\t\n";
+ // const string bodyString = "body";
+ // var body = Encoding.ASCII.GetBytes(bodyString);
+ // var _ = new Message(topic, body);
}
-
}
}
\ No newline at end of file