You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/01 05:24:57 UTC
[rocketmq-clients] branch csharp_dev updated: Clean up code
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/csharp_dev by this push:
new c3233f1 Clean up code
c3233f1 is described below
commit c3233f131d99d86d21d0e1b73b01c787a5179802
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Thu Sep 1 13:24:47 2022 +0800
Clean up code
---
csharp/examples/Program.cs | 44 ++++--
csharp/rocketmq-client-csharp/Client.cs | 183 ++++++++++++++----------
csharp/rocketmq-client-csharp/IClient.cs | 4 +-
csharp/rocketmq-client-csharp/Session.cs | 39 +++--
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 181 +++++++++++++----------
5 files changed, 262 insertions(+), 189 deletions(-)
diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index e0d3851..c0dd734 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -23,11 +23,13 @@ namespace examples
{
class Program
{
- private const string accessUrl = "rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
- private const string standardTopic = "sdk_standard";
- private const string fifoTopic = "sdk_fifo";
- private const string timedTopic = "sdk_timed";
- private const string transactionalTopic = "sdk_transactional";
+ private const string ACCESS_URL = "rmq-cn-tl32uly8x0n.cn-hangzhou.rmq.aliyuncs.com:8080";
+ private const string STANDARD_TOPIC = "sdk_standard";
+ private const string FIFO_TOPIC = "sdk_fifo";
+ private const string TIMED_TOPIC = "sdk_timed";
+ private const string TRANSACTIONAL_TOPIC = "sdk_transactional";
+
+ private const string CONCURRENT_GROUP = "sdk_concurrency";
private static async Task<SendReceipt> SendStandardMessage(Producer producer)
{
@@ -40,7 +42,7 @@ namespace examples
"k2"
};
- var msg = new Message(standardTopic, body)
+ var msg = new Message(STANDARD_TOPIC, body)
{
// Tag the massage. A message has at most one tag.
Tag = "Tag-0",
@@ -63,7 +65,7 @@ namespace examples
"k2"
};
- var msg = new Message(fifoTopic, body)
+ var msg = new Message(FIFO_TOPIC, body)
{
// Tag the massage. A message has at most one tag.
Tag = "Tag-0",
@@ -89,7 +91,7 @@ namespace examples
"k2"
};
- var msg = new Message(timedTopic, body)
+ var msg = new Message(TIMED_TOPIC, body)
{
// Tag the massage. A message has at most one tag.
Tag = "Tag-0",
@@ -100,16 +102,30 @@ namespace examples
msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30);
return await producer.Send(msg);
}
+
+ private static async Task ConsumeMessage(SimpleConsumer simpleConsumer)
+ {
+ var messages = await simpleConsumer.Receive(32, TimeSpan.FromSeconds(60));
+ if (null != messages)
+ {
+ foreach (var message in messages)
+ {
+ Console.WriteLine($"Receive a message, topic={message.Topic}, message-id={message.MessageId}");
+ }
+ }
+ }
static async Task Main(string[] args)
{
var credentialsProvider = new ConfigFileCredentialsProvider();
- var producer = new Producer(accessUrl);
- producer.CredentialsProvider = credentialsProvider;
- producer.AddTopicOfInterest(standardTopic);
- producer.AddTopicOfInterest(fifoTopic);
- producer.AddTopicOfInterest(timedTopic);
- producer.AddTopicOfInterest(transactionalTopic);
+ var producer = new Producer(ACCESS_URL)
+ {
+ CredentialsProvider = credentialsProvider
+ };
+ producer.AddTopicOfInterest(STANDARD_TOPIC);
+ producer.AddTopicOfInterest(FIFO_TOPIC);
+ producer.AddTopicOfInterest(TIMED_TOPIC);
+ producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
await producer.Start();
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 0fe4ec9..4b4884d 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -22,7 +22,7 @@ using System.Threading;
using System.Diagnostics;
using System;
using rmq = Apache.Rocketmq.V2;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using NLog;
using System.Diagnostics.Metrics;
@@ -37,34 +37,39 @@ namespace Org.Apache.Rocketmq
AccessPoint = new AccessPoint(accessUrl);
AccessPointScheme = AccessPoint.HostScheme();
- var serviceEndpoint = new rmq::Address();
- serviceEndpoint.Host = AccessPoint.Host;
- serviceEndpoint.Port = AccessPoint.Port;
+ var serviceEndpoint = new rmq::Address
+ {
+ Host = AccessPoint.Host,
+ Port = AccessPoint.Port
+ };
AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
_resourceNamespace = "";
- ClientSettings = new rmq::Settings();
+ ClientSettings = new rmq::Settings
+ {
+ AccessPoint = new rmq::Endpoints
+ {
+ Scheme = rmq::AddressScheme.Ipv4
+ }
+ };
- ClientSettings.AccessPoint = new rmq::Endpoints();
- ClientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
ClientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
ClientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
- ClientSettings.UserAgent = new rmq.UA();
- ClientSettings.UserAgent.Language = rmq::Language.DotNet;
- ClientSettings.UserAgent.Version = "5.0.0";
- ClientSettings.UserAgent.Platform = Environment.OSVersion.ToString();
- ClientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+ ClientSettings.UserAgent = new rmq.UA
+ {
+ Language = rmq::Language.DotNet,
+ Version = "5.0.0",
+ Platform = Environment.OSVersion.ToString(),
+ Hostname = System.Net.Dns.GetHostName()
+ };
_manager = new ClientManager();
_topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
-
- _healthCheckCts = new CancellationTokenSource();
-
_telemetryCts = new CancellationTokenSource();
}
@@ -95,7 +100,7 @@ namespace Org.Apache.Rocketmq
await _manager.Shutdown();
}
- protected string FilterBroker(Func<string, bool> acceptor)
+ private string FilterBroker(Func<string, bool> acceptor)
{
foreach (var item in _topicRouteTable)
{
@@ -188,7 +193,7 @@ namespace Org.Apache.Rocketmq
}
}
- public void Schedule(Action action, int seconds, CancellationToken token)
+ protected void Schedule(Action action, int seconds, CancellationToken token)
{
if (null == action)
{
@@ -213,7 +218,7 @@ namespace Org.Apache.Rocketmq
* direct
* Indicate if we should by-pass cache and fetch route entries from name server.
*/
- public async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
+ protected async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
{
if (!direct && _topicRouteTable.ContainsKey(topic))
{
@@ -221,12 +226,18 @@ namespace Org.Apache.Rocketmq
}
// We got one or more name servers available.
- var request = new rmq::QueryRouteRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
- request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = AccessPointScheme;
+ var request = new rmq::QueryRouteRequest
+ {
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ },
+ Endpoints = new rmq::Endpoints
+ {
+ Scheme = AccessPointScheme
+ }
+ };
foreach (var address in AccessPointEndpoints)
{
request.Endpoints.Addresses.Add(address);
@@ -238,21 +249,17 @@ namespace Org.Apache.Rocketmq
var serviceEndpoint = AccessPointEndpoints[index];
// AccessPointAddresses.Count
string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
- TopicRouteData topicRouteData;
try
{
Logger.Debug($"Resolving route for topic={topic}");
- topicRouteData = await _manager.ResolveRoute(target, metadata, request, RequestTimeout);
+ var topicRouteData = await _manager.ResolveRoute(target, metadata, request, RequestTimeout);
if (null != topicRouteData)
{
Logger.Debug($"Got route entries for {topic} from name server");
_topicRouteTable.TryAdd(topic, topicRouteData);
return topicRouteData;
}
- else
- {
- Logger.Warn($"Failed to query route of {topic} from {target}");
- }
+ Logger.Warn($"Failed to query route of {topic} from {target}");
}
catch (Exception e)
{
@@ -273,7 +280,11 @@ namespace Org.Apache.Rocketmq
return;
}
- var request = new rmq::HeartbeatRequest();
+ var request = new rmq::HeartbeatRequest
+ {
+ Group = null,
+ ClientType = rmq.ClientType.Unspecified
+ };
PrepareHeartbeatData(request);
var metadata = new grpc::Metadata();
@@ -303,15 +314,24 @@ namespace Org.Apache.Rocketmq
{
// Pick a broker randomly
string target = FilterBroker((s) => true);
- var request = new rmq::QueryAssignmentRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
- request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = AccessPointScheme;
+ var request = new rmq::QueryAssignmentRequest
+ {
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ },
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ Endpoints = new rmq::Endpoints
+ {
+ Scheme = AccessPointScheme
+ }
+ };
+
foreach (var endpoint in AccessPointEndpoints)
{
request.Endpoints.Addresses.Add(endpoint);
@@ -344,12 +364,12 @@ namespace Org.Apache.Rocketmq
settings.MergeFrom(ClientSettings);
}
- public void CreateSession(string url)
+ private void CreateSession(string url)
{
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
var stream = _manager.Telemetry(url, metadata);
- var session = new Session(url, stream, this);
+ var session = new Session(stream, this);
_sessions.TryAdd(url, session);
Task.Run(async () =>
{
@@ -358,34 +378,45 @@ namespace Org.Apache.Rocketmq
}
- public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
+ internal async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
{
var targetUrl = TargetUrl(assignment);
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- var request = new rmq::ReceiveMessageRequest();
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
- request.MessageQueue = assignment.MessageQueue;
+ var request = new rmq::ReceiveMessageRequest
+ {
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ MessageQueue = assignment.MessageQueue
+ };
var messages = await _manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
return messages;
}
public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
{
- var request = new rmq::AckMessageRequest();
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
-
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
-
- var entry = new rmq::AckMessageEntry();
- entry.ReceiptHandle = receiptHandle;
- entry.MessageId = messageId;
+ var request = new rmq::AckMessageRequest
+ {
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ }
+ };
+
+ var entry = new rmq::AckMessageEntry
+ {
+ ReceiptHandle = receiptHandle,
+ MessageId = messageId
+ };
request.Entries.Add(entry);
var metadata = new grpc::Metadata();
@@ -395,17 +426,21 @@ namespace Org.Apache.Rocketmq
public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
{
- var request = new rmq::ChangeInvisibleDurationRequest();
- request.ReceiptHandle = receiptHandle;
- request.Group = new rmq::Resource();
- request.Group.ResourceNamespace = _resourceNamespace;
- request.Group.Name = group;
-
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = _resourceNamespace;
- request.Topic.Name = topic;
-
- request.MessageId = messageId;
+ var request = new rmq::ChangeInvisibleDurationRequest
+ {
+ ReceiptHandle = receiptHandle,
+ Group = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = group
+ },
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = _resourceNamespace,
+ Name = topic
+ },
+ MessageId = messageId
+ };
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
@@ -439,7 +474,7 @@ namespace Org.Apache.Rocketmq
return true;
}
- public virtual void OnSettingsReceived(rmq::Settings settings)
+ internal virtual void OnSettingsReceived(rmq::Settings settings)
{
if (null != settings.Metric)
{
@@ -465,9 +500,7 @@ namespace Org.Apache.Rocketmq
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
private readonly CancellationTokenSource _updateTopicRouteCts;
-
- private readonly CancellationTokenSource _healthCheckCts;
-
+
private readonly CancellationTokenSource _telemetryCts;
public CancellationTokenSource TelemetryCts()
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs
index 3352028..a96e940 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -30,9 +30,7 @@ namespace Org.Apache.Rocketmq
Task<bool> NotifyClientTermination();
void BuildClientSetting(rmq::Settings settings);
-
-
- void OnSettingsReceived(rmq::Settings settings);
+
CancellationTokenSource TelemetryCts();
}
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index a6be057..d2011bf 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -18,7 +18,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using NLog;
using rmq = Apache.Rocketmq.V2;
@@ -28,22 +28,22 @@ namespace Org.Apache.Rocketmq
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- public Session(string target,
- grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
- IClient client)
+ public Session(grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+ Client client)
{
- this._target = target;
- this._stream = stream;
- this._client = client;
- this._channel = Channel.CreateUnbounded<bool>();
+ _stream = stream;
+ _client = client;
+ _channel = Channel.CreateUnbounded<bool>();
}
public async Task Loop()
{
- var reader = this._stream.ResponseStream;
- var writer = this._stream.RequestStream;
- var request = new rmq::TelemetryCommand();
- request.Settings = new rmq::Settings();
+ var reader = _stream.ResponseStream;
+ var writer = _stream.RequestStream;
+ var request = new rmq::TelemetryCommand
+ {
+ Settings = new rmq::Settings()
+ };
_client.BuildClientSetting(request.Settings);
await writer.WriteAsync(request);
Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
@@ -94,13 +94,6 @@ namespace Org.Apache.Rocketmq
await writer.CompleteAsync();
}
- private string _target;
-
- public string Target
- {
- get { return _target; }
- }
-
public async Task AwaitSettingNegotiationCompletion()
{
if (0 != Interlocked.Read(ref _established))
@@ -112,11 +105,11 @@ namespace Org.Apache.Rocketmq
await _channel.Reader.ReadAsync();
}
- private grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
- private IClient _client;
+ private readonly grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> _stream;
+ private readonly Client _client;
- private long _established = 0;
+ private long _established;
- private Channel<bool> _channel;
+ private readonly Channel<bool> _channel;
};
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index d4694ac..c2f4d58 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -33,7 +33,6 @@ namespace Org.Apache.Rocketmq
public SimpleConsumer(string accessUrl, string group)
: base(accessUrl)
{
- _fifo = false;
_subscriptions = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
_topicAssignments = new ConcurrentDictionary<string, List<rmq.Assignment>>();
_group = group;
@@ -44,10 +43,14 @@ namespace Org.Apache.Rocketmq
base.BuildClientSetting(settings);
settings.ClientType = rmq::ClientType.SimpleConsumer;
- settings.Subscription = new rmq::Subscription();
- settings.Subscription.Group = new rmq::Resource();
- settings.Subscription.Group.Name = _group;
- settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+ settings.Subscription = new rmq::Subscription
+ {
+ Group = new rmq::Resource
+ {
+ Name = _group,
+ ResourceNamespace = ResourceNamespace
+ }
+ };
foreach (var kv in _subscriptions)
{
@@ -87,20 +90,30 @@ namespace Org.Apache.Rocketmq
List<string> topics = new List<string>();
foreach (var sub in _subscriptions)
{
- var request = new rmq::QueryAssignmentRequest();
- request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = ResourceNamespace;
- request.Topic.Name = sub.Key;
+ var request = new rmq::QueryAssignmentRequest
+ {
+ Topic = new rmq::Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = sub.Key
+ }
+ };
topics.Add(sub.Key);
- request.Group = new rmq::Resource();
- request.Group.Name = _group;
- request.Group.ResourceNamespace = ResourceNamespace;
-
- request.Endpoints = new rmq::Endpoints();
- request.Endpoints.Scheme = rmq.AddressScheme.Ipv4;
- var address = new rmq::Address();
- address.Host = AccessPoint.Host;
- address.Port = AccessPoint.Port;
+ request.Group = new rmq::Resource
+ {
+ Name = _group,
+ ResourceNamespace = ResourceNamespace
+ };
+
+ request.Endpoints = new rmq::Endpoints
+ {
+ Scheme = rmq.AddressScheme.Ipv4
+ };
+ var address = new rmq::Address
+ {
+ Host = AccessPoint.Host,
+ Port = AccessPoint.Port
+ };
request.Endpoints.Addresses.Add(address);
var metadata = new Metadata();
@@ -129,31 +142,39 @@ namespace Org.Apache.Rocketmq
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
{
request.ClientType = rmq::ClientType.SimpleConsumer;
- request.Group = new rmq::Resource();
- request.Group.Name = _group;
- request.Group.ResourceNamespace = ResourceNamespace;
+ request.Group = new rmq::Resource
+ {
+ Name = _group,
+ ResourceNamespace = ResourceNamespace
+ };
}
public void Subscribe(string topic, rmq::FilterType filterType, string expression)
{
- var entry = new rmq::SubscriptionEntry();
- entry.Topic = new rmq::Resource();
- entry.Topic.Name = topic;
- entry.Topic.ResourceNamespace = ResourceNamespace;
- entry.Expression = new rmq::FilterExpression();
- entry.Expression.Type = filterType;
- entry.Expression.Expression = expression;
+ var entry = new rmq::SubscriptionEntry
+ {
+ Topic = new rmq::Resource
+ {
+ Name = topic,
+ ResourceNamespace = ResourceNamespace
+ },
+ Expression = new rmq::FilterExpression
+ {
+ Type = filterType,
+ Expression = expression
+ }
+ };
+
_subscriptions.AddOrUpdate(topic, entry, (k, prev) => entry);
AddTopicOfInterest(topic);
}
- public override void OnSettingsReceived(rmq.Settings settings)
+ internal override void OnSettingsReceived(rmq.Settings settings)
{
base.OnSettingsReceived(settings);
if (settings.Subscription.Fifo)
{
- _fifo = true;
Logger.Info($"#OnSettingsReceived: Group {_group} is FIFO");
}
}
@@ -167,12 +188,16 @@ namespace Org.Apache.Rocketmq
return new List<Message>();
}
- var request = new rmq.ReceiveMessageRequest();
- request.Group = new rmq.Resource();
- request.Group.ResourceNamespace = ResourceNamespace;
- request.Group.Name = _group;
+ var request = new rmq.ReceiveMessageRequest
+ {
+ Group = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = _group
+ },
+ MessageQueue = new rmq.MessageQueue()
+ };
- request.MessageQueue = new rmq.MessageQueue();
request.MessageQueue.MergeFrom(messageQueue);
request.BatchSize = batchSize;
@@ -189,15 +214,20 @@ namespace Org.Apache.Rocketmq
public async Task Ack(Message message)
{
- var request = new rmq.AckMessageRequest();
- request.Group = new rmq.Resource();
- request.Group.ResourceNamespace = ResourceNamespace;
- request.Group.Name = _group;
-
- request.Topic = new rmq.Resource();
- request.Topic.ResourceNamespace = ResourceNamespace;
- request.Topic.Name = message.Topic;
-
+ var request = new rmq.AckMessageRequest
+ {
+ Group = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = _group
+ },
+ Topic = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = message.Topic
+ }
+ };
+
var entry = new rmq.AckMessageEntry();
request.Entries.Add(entry);
entry.MessageId = message.MessageId;
@@ -211,19 +241,22 @@ namespace Org.Apache.Rocketmq
public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
{
- var request = new rmq.ChangeInvisibleDurationRequest();
- request.Group = new rmq.Resource();
- request.Group.ResourceNamespace = ResourceNamespace;
- request.Group.Name = _group;
-
- request.Topic = new rmq.Resource();
- request.Topic.ResourceNamespace = ResourceNamespace;
- request.Topic.Name = message.Topic;
-
- request.ReceiptHandle = message._receiptHandle;
- request.MessageId = message.MessageId;
-
- request.InvisibleDuration = Duration.FromTimeSpan(invisibleDuration);
+ var request = new rmq.ChangeInvisibleDurationRequest
+ {
+ Group = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = _group
+ },
+ Topic = new rmq.Resource
+ {
+ ResourceNamespace = ResourceNamespace,
+ Name = message.Topic
+ },
+ ReceiptHandle = message._receiptHandle,
+ MessageId = message.MessageId,
+ InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
+ };
var targetUrl = message._sourceHost;
var metadata = new Metadata();
@@ -238,35 +271,35 @@ namespace Org.Apache.Rocketmq
return null;
}
- UInt32 topicSeq = CurrentTopicSequence.Value;
- CurrentTopicSequence.Value = topicSeq + 1;
+ var topicSeq = _currentTopicSequence.Value;
+ _currentTopicSequence.Value = topicSeq + 1;
var total = _topicAssignments.Count;
var topicIndex = topicSeq % total;
var topic = _topicAssignments.Keys.Skip((int)topicIndex).First();
- UInt32 queueSeq = CurrentQueueSequence.Value;
- CurrentQueueSequence.Value = queueSeq + 1;
- List<rmq.Assignment> assignments;
- if (_topicAssignments.TryGetValue(topic, out assignments))
+ UInt32 queueSeq = _currentQueueSequence.Value;
+ _currentQueueSequence.Value = queueSeq + 1;
+ if (!_topicAssignments.TryGetValue(topic, out var assignments))
{
- if (null == assignments)
- {
- return null;
- }
- var idx = queueSeq % assignments.Count;
- return assignments[(int)idx].MessageQueue;
-
+ return null;
}
- return null;
+ var idx = queueSeq % assignments?.Count;
+ return assignments?[(int)idx].MessageQueue;
}
- private ThreadLocal<UInt32> CurrentTopicSequence = new ThreadLocal<UInt32>(true);
- private ThreadLocal<UInt32> CurrentQueueSequence = new ThreadLocal<UInt32>(true);
+ private readonly ThreadLocal<UInt32> _currentTopicSequence = new ThreadLocal<UInt32>(true)
+ {
+ Value = 0
+ };
+
+ private readonly ThreadLocal<UInt32> _currentQueueSequence = new ThreadLocal<UInt32>(true)
+ {
+ Value = 0
+ };
private readonly string _group;
- private bool _fifo;
private readonly ConcurrentDictionary<string, rmq::SubscriptionEntry> _subscriptions;
private readonly ConcurrentDictionary<string, List<rmq.Assignment>> _topicAssignments;
private readonly CancellationTokenSource _scanAssignmentCts = new CancellationTokenSource();