You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/09/01 09:10:11 UTC
[rocketmq-clients] branch csharp_dev updated: WIP: bugfix
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch csharp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/csharp_dev by this push:
new ff2ff51 WIP: bugfix
ff2ff51 is described below
commit ff2ff51f25561acd72a7b1b2f43752abf8aefbfb
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Thu Sep 1 17:10:01 2022 +0800
WIP: bugfix
---
csharp/examples/Program.cs | 46 ++++----
csharp/rocketmq-client-csharp/AccessPoint.cs | 28 ++---
csharp/rocketmq-client-csharp/Client.cs | 83 ++++++++------
csharp/rocketmq-client-csharp/ClientConfig.cs | 24 +----
csharp/rocketmq-client-csharp/IClientConfig.cs | 4 -
csharp/rocketmq-client-csharp/Producer.cs | 4 +-
csharp/rocketmq-client-csharp/RpcClient.cs | 2 +-
csharp/rocketmq-client-csharp/Session.cs | 13 ++-
csharp/rocketmq-client-csharp/Signature.cs | 10 +-
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 40 +++----
csharp/tests/ClientManagerTest.cs | 2 +-
csharp/tests/ProducerTest.cs | 138 ++++++++++++------------
csharp/tests/RpcClientTest.cs | 6 +-
csharp/tests/SignatureTest.cs | 11 +-
14 files changed, 202 insertions(+), 209 deletions(-)
diff --git a/csharp/examples/Program.cs b/csharp/examples/Program.cs
index c15c6f1..f3c6027 100644
--- a/csharp/examples/Program.cs
+++ b/csharp/examples/Program.cs
@@ -123,36 +123,40 @@ namespace examples
static async Task Main(string[] args)
{
var credentialsProvider = new ConfigFileCredentialsProvider();
- var producer = new Producer(ACCESS_URL)
+ // var producer = new Producer(ACCESS_URL)
+ // {
+ // CredentialsProvider = credentialsProvider
+ // };
+ // producer.AddTopicOfInterest(STANDARD_TOPIC);
+ // producer.AddTopicOfInterest(FIFO_TOPIC);
+ // producer.AddTopicOfInterest(TIMED_TOPIC);
+ // producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
+ //
+ // await producer.Start();
+ //
+ // var sendReceiptOfStandardMessage = await SendStandardMessage(producer);
+ // Console.WriteLine($"Standard message-id: {sendReceiptOfStandardMessage.MessageId}");
+ //
+ // var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
+ // Console.WriteLine($"FIFO message-id: {sendReceiptOfFifoMessage.MessageId}");
+ //
+ // var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
+ // Console.WriteLine($"Timed message-id: {sendReceiptOfTimedMessage.MessageId}");
+ //
+ // await producer.Shutdown();
+
+ Console.WriteLine("Now start a simple consumer");
+ var simpleConsumer = new SimpleConsumer(ACCESS_URL, CONCURRENT_GROUP)
{
CredentialsProvider = credentialsProvider
};
- producer.AddTopicOfInterest(STANDARD_TOPIC);
- producer.AddTopicOfInterest(FIFO_TOPIC);
- producer.AddTopicOfInterest(TIMED_TOPIC);
- producer.AddTopicOfInterest(TRANSACTIONAL_TOPIC);
-
- await producer.Start();
-
- var sendReceiptOfStandardMessage = await SendStandardMessage(producer);
- Console.WriteLine($"Standard message-id: {sendReceiptOfStandardMessage.MessageId}");
-
- var sendReceiptOfFifoMessage = await SendFifoMessage(producer);
- Console.WriteLine($"FIFO message-id: {sendReceiptOfFifoMessage.MessageId}");
- var sendReceiptOfTimedMessage = await SendTimedMessage(producer);
- Console.WriteLine($"Timed message-id: {sendReceiptOfTimedMessage.MessageId}");
-
- await producer.Shutdown();
-
- Console.WriteLine("Now start a simple consumer");
- var simpleConsumer = new SimpleConsumer(ACCESS_URL, CONCURRENT_GROUP);
simpleConsumer.Subscribe(STANDARD_TOPIC, new FilterExpression("*", ExpressionType.TAG));
await simpleConsumer.Start();
await ConsumeAndAckMessages(simpleConsumer);
- await simpleConsumer.Shutdown();
+ // await simpleConsumer.Shutdown();
Console.ReadKey();
}
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs b/csharp/rocketmq-client-csharp/AccessPoint.cs
index ab29273..f05fa29 100644
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ b/csharp/rocketmq-client-csharp/AccessPoint.cs
@@ -37,35 +37,27 @@ namespace Org.Apache.Rocketmq
throw new ArgumentException("Access url should be of format host:port");
}
- _host = segments[0];
- _port = Int32.Parse(segments[1]);
+ Host = segments[0];
+ Port = Int32.Parse(segments[1]);
}
-
- private string _host;
- public string Host
- {
- get { return _host; }
- set { _host = value; }
- }
+ public string Host { get; }
- private int _port;
+ public int Port { get; set; }
- public int Port
+ public string TargetUrl()
{
- get { return _port; }
- set { _port = value; }
+ return $"https://{Host}:{Port}";
}
- public string TargetUrl()
+ public rmq::AddressScheme HostScheme()
{
- return $"https://{_host}:{_port}";
+ return SchemeOf(Host);
}
- public rmq::AddressScheme HostScheme()
+ private static rmq::AddressScheme SchemeOf(string host)
{
- IPAddress ip;
- bool result = IPAddress.TryParse(_host, out ip);
+ var result = IPAddress.TryParse(host, out var ip);
if (!result)
{
return rmq::AddressScheme.DomainName;
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 4b4884d..675b9eb 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -50,7 +50,7 @@ namespace Org.Apache.Rocketmq
{
AccessPoint = new rmq::Endpoints
{
- Scheme = rmq::AddressScheme.Ipv4
+ Scheme = AccessPoint.HostScheme()
}
};
@@ -66,7 +66,7 @@ namespace Org.Apache.Rocketmq
Hostname = System.Net.Dns.GetHostName()
};
- _manager = new ClientManager();
+ Manager = new ClientManager();
_topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
@@ -77,27 +77,28 @@ namespace Org.Apache.Rocketmq
{
Schedule(async () =>
{
+ Logger.Debug("Update topic route by schedule");
await UpdateTopicRoute();
}, 30, _updateTopicRouteCts.Token);
// Get routes for topics of interest.
+ Logger.Debug("Step of #Start: get route for topics of interest");
await UpdateTopicRoute();
string accessPointUrl = AccessPoint.TargetUrl();
CreateSession(accessPointUrl);
-
await _sessions[accessPointUrl].AwaitSettingNegotiationCompletion();
-
+ Logger.Debug($"Session has been created for {accessPointUrl}");
await Heartbeat();
}
public virtual async Task Shutdown()
{
- Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
+ Logger.Info($"Shutdown client");
_updateTopicRouteCts.Cancel();
_telemetryCts.Cancel();
- await _manager.Shutdown();
+ await Manager.Shutdown();
}
private string FilterBroker(Func<string, bool> acceptor)
@@ -106,7 +107,7 @@ namespace Org.Apache.Rocketmq
{
foreach (var partition in item.Value.MessageQueues)
{
- string target = Utilities.TargetUrl(partition);
+ var target = Utilities.TargetUrl(partition);
if (acceptor(target))
{
return target;
@@ -121,7 +122,7 @@ namespace Org.Apache.Rocketmq
*/
private List<string> AvailableBrokerEndpoints()
{
- List<string> endpoints = new List<string>();
+ var endpoints = new List<string>();
foreach (var item in _topicRouteTable)
{
foreach (var partition in item.Value.MessageQueues)
@@ -154,7 +155,7 @@ namespace Org.Apache.Rocketmq
List<string> topicList = new List<string>();
topicList.AddRange(topics);
- List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+ var tasks = new List<Task<TopicRouteData>>();
foreach (var item in topicList)
{
tasks.Add(GetRouteFor(item, true));
@@ -220,8 +221,10 @@ namespace Org.Apache.Rocketmq
*/
protected async Task<TopicRouteData> GetRouteFor(string topic, bool direct)
{
+ Logger.Debug($"Get route for topic={topic}, direct={direct}");
if (!direct && _topicRouteTable.ContainsKey(topic))
{
+ Logger.Debug($"Return cached route for {topic}");
return _topicRouteTable[topic];
}
@@ -244,7 +247,7 @@ namespace Org.Apache.Rocketmq
}
var metadata = new grpc.Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
int index = _random.Next(0, AccessPointEndpoints.Count);
var serviceEndpoint = AccessPointEndpoints[index];
// AccessPointAddresses.Count
@@ -252,11 +255,12 @@ namespace Org.Apache.Rocketmq
try
{
Logger.Debug($"Resolving route for topic={topic}");
- var topicRouteData = await _manager.ResolveRoute(target, metadata, request, RequestTimeout);
+ var topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
if (null != topicRouteData)
{
Logger.Debug($"Got route entries for {topic} from name server");
_topicRouteTable.TryAdd(topic, topicRouteData);
+ Logger.Debug($"Got route for {topic} from {target}");
return topicRouteData;
}
Logger.Warn($"Failed to query route of {topic} from {target}");
@@ -288,12 +292,12 @@ namespace Org.Apache.Rocketmq
PrepareHeartbeatData(request);
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
List<Task> tasks = new List<Task>();
foreach (var endpoint in endpoints)
{
- tasks.Add(_manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
+ tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
}
await Task.WhenAll(tasks);
@@ -339,8 +343,8 @@ namespace Org.Apache.Rocketmq
try
{
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- return await _manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
}
catch (System.Exception e)
{
@@ -364,25 +368,22 @@ namespace Org.Apache.Rocketmq
settings.MergeFrom(ClientSettings);
}
- private void CreateSession(string url)
+ private async Task CreateSession(string url)
{
+ Logger.Debug($"Create session for url={url}");
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- var stream = _manager.Telemetry(url, metadata);
- var session = new Session(stream, this);
+ Signature.Sign(this, metadata);
+ var stream = Manager.Telemetry(url, metadata);
+ var session = new Session(url, stream, this);
_sessions.TryAdd(url, session);
- Task.Run(async () =>
- {
- await session.Loop();
- });
+ await session.Loop();
}
-
internal async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
{
var targetUrl = TargetUrl(assignment);
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
var request = new rmq::ReceiveMessageRequest
{
Group = new rmq::Resource
@@ -392,7 +393,8 @@ namespace Org.Apache.Rocketmq
},
MessageQueue = assignment.MessageQueue
};
- var messages = await _manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
+ var messages = await Manager.ReceiveMessage(targetUrl, metadata, request,
+ ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
return messages;
}
@@ -420,8 +422,8 @@ namespace Org.Apache.Rocketmq
request.Entries.Add(entry);
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- return await _manager.Ack(target, metadata, request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ return await Manager.Ack(target, metadata, request, RequestTimeout);
}
public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
@@ -443,8 +445,8 @@ namespace Org.Apache.Rocketmq
};
var metadata = new grpc::Metadata();
- Signature.sign(this, metadata);
- return await _manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
}
public async Task<bool> NotifyClientTermination()
@@ -454,13 +456,13 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc.Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
List<Task<Boolean>> tasks = new List<Task<Boolean>>();
foreach (var endpoint in endpoints)
{
- tasks.Add(_manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
+ tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
}
bool[] results = await Task.WhenAll(tasks);
@@ -487,9 +489,24 @@ namespace Org.Apache.Rocketmq
ClientSettings.BackoffPolicy = new rmq::RetryPolicy();
ClientSettings.BackoffPolicy.MergeFrom(settings.BackoffPolicy);
}
+
+ switch (settings.PubSubCase)
+ {
+ case rmq.Settings.PubSubOneofCase.Publishing:
+ {
+ ClientSettings.Publishing = settings.Publishing;
+ break;
+ }
+
+ case rmq.Settings.PubSubOneofCase.Subscription:
+ {
+ ClientSettings.Subscription = settings.Subscription;
+ break;
+ }
+ }
}
- protected readonly IClientManager _manager;
+ protected readonly IClientManager Manager;
private readonly HashSet<string> _topicsOfInterest = new HashSet<string>();
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs
index 0d99cb1..86175a2 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -75,15 +75,6 @@ namespace Org.Apache.Rocketmq
set { credentialsProvider_ = value; }
}
- public string tenantId()
- {
- return _tenantId;
- }
- public string TenantId
- {
- set { _tenantId = value; }
- }
-
public TimeSpan RequestTimeout
{
get
@@ -96,15 +87,6 @@ namespace Org.Apache.Rocketmq
}
}
- public TimeSpan getLongPollingTimeout()
- {
- return longPollingIoTimeout_;
- }
- public TimeSpan LongPollingTimeout
- {
- set { longPollingIoTimeout_ = value; }
- }
-
public string getGroupName()
{
return groupName_;
@@ -139,9 +121,7 @@ namespace Org.Apache.Rocketmq
protected string _resourceNamespace;
private ICredentialsProvider credentialsProvider_;
-
- private string _tenantId;
-
+
private TimeSpan _requestTimeout;
private TimeSpan longPollingIoTimeout_;
@@ -150,7 +130,7 @@ namespace Org.Apache.Rocketmq
private string clientId_;
- private bool tracingEnabled_ = false;
+ private bool tracingEnabled_;
private string instanceName_ = "default";
diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs b/csharp/rocketmq-client-csharp/IClientConfig.cs
index 438d7a8..57325b4 100644
--- a/csharp/rocketmq-client-csharp/IClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/IClientConfig.cs
@@ -28,10 +28,6 @@ namespace Org.Apache.Rocketmq
ICredentialsProvider credentialsProvider();
- string tenantId();
-
- TimeSpan getLongPollingTimeout();
-
string getGroupName();
string clientId();
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index e337b1a..7d55904 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -161,7 +161,7 @@ namespace Org.Apache.Rocketmq
}
var metadata = new Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
Exception ex = null;
@@ -171,7 +171,7 @@ namespace Org.Apache.Rocketmq
{
var stopWatch = new Stopwatch();
stopWatch.Start();
- rmq::SendMessageResponse response = await _manager.SendMessage(target, metadata, request, RequestTimeout);
+ rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok == response.Status.Code)
{
var messageId = response.Entries[0].MessageId;
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs
index c1f1cd6..dc9d753 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -76,7 +76,7 @@ namespace Org.Apache.Rocketmq
public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
{
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+ var deadline = DateTime.UtcNow.Add(TimeSpan.FromDays(3650));
var callOptions = new CallOptions(metadata, deadline);
return _stub.Telemetry(callOptions);
}
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index d2011bf..4d09894 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -28,9 +28,11 @@ namespace Org.Apache.Rocketmq
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- public Session(grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
+ public Session(string target,
+ grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> stream,
Client client)
{
+ _target = target;
_stream = stream;
_client = client;
_channel = Channel.CreateUnbounded<bool>();
@@ -46,13 +48,13 @@ namespace Org.Apache.Rocketmq
};
_client.BuildClientSetting(request.Settings);
await writer.WriteAsync(request);
- Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
+ Logger.Debug($"Writing Client Settings to {_target} Done: {request.Settings}");
while (!_client.TelemetryCts().IsCancellationRequested)
{
if (await reader.MoveNext(_client.TelemetryCts().Token))
{
var cmd = reader.Current;
- Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
+ Logger.Debug($"Received a TelemetryCommand from {_target}: {cmd}");
switch (cmd.CommandCase)
{
case rmq::TelemetryCommand.CommandOneofCase.None:
@@ -71,7 +73,7 @@ namespace Org.Apache.Rocketmq
await _channel.Writer.WriteAsync(true);
}
- Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+ Logger.Info($"Received settings from {_target}: {cmd.Settings}");
_client.OnSettingsReceived(cmd.Settings);
break;
}
@@ -90,7 +92,7 @@ namespace Org.Apache.Rocketmq
}
}
}
- Logger.Info("Telemetry stream cancelled");
+ Logger.Info($"Telemetry stream for {_target} is cancelled");
await writer.CompleteAsync();
}
@@ -111,5 +113,6 @@ namespace Org.Apache.Rocketmq
private long _established;
private readonly Channel<bool> _channel;
+ private readonly string _target;
};
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index 2331b53..ec78171 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -16,22 +16,18 @@
*/
using System;
using System.Text;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using System.Security.Cryptography;
namespace Org.Apache.Rocketmq
{
- public class Signature
+ public static class Signature
{
- public static void sign(IClientConfig clientConfig, grpc::Metadata metadata)
+ public static void Sign(IClientConfig clientConfig, grpc::Metadata metadata)
{
metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId());
- if (!String.IsNullOrEmpty(clientConfig.tenantId()))
- {
- metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
- }
if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
{
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 121e9d3..b9d83ce 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -56,6 +56,7 @@ namespace Org.Apache.Rocketmq
{
settings.Subscription.Subscriptions.Add(kv.Value);
}
+ Logger.Info($"ClientSettings built OK. {settings}");
}
public override async Task Start()
@@ -65,29 +66,31 @@ namespace Org.Apache.Rocketmq
// Scan load assignment periodically
Schedule(async () =>
{
- while (!_scanAssignmentCts.IsCancellationRequested)
- {
- await ScanLoadAssignments();
- }
+ Logger.Debug("Scan load assignments by schedule");
+ await ScanLoadAssignments();
}, 30, _scanAssignmentCts.Token);
await ScanLoadAssignments();
+ Logger.Debug("Step of #Start: ScanLoadAssignments completed");
}
public override async Task Shutdown()
{
+ _scanAssignmentCts.Cancel();
await base.Shutdown();
if (!await NotifyClientTermination())
{
Logger.Warn("Failed to NotifyClientTermination");
}
}
-
+
+ /**
+ * For 5.x, we can assume there is a load-balancer before gateway nodes.
+ */
private async Task ScanLoadAssignments()
{
-
- List<Task<List<rmq::Assignment>>> tasks = new List<Task<List<rmq.Assignment>>>();
- List<string> topics = new List<string>();
+ var tasks = new List<Task<List<rmq.Assignment>>>();
+ var topics = new List<string>();
foreach (var sub in _subscriptions)
{
var request = new rmq::QueryAssignmentRequest
@@ -107,7 +110,7 @@ namespace Org.Apache.Rocketmq
request.Endpoints = new rmq::Endpoints
{
- Scheme = rmq.AddressScheme.Ipv4
+ Scheme = AccessPoint.HostScheme()
};
var address = new rmq::Address
{
@@ -117,12 +120,11 @@ namespace Org.Apache.Rocketmq
request.Endpoints.Addresses.Add(address);
var metadata = new Metadata();
- Signature.sign(this, metadata);
- tasks.Add(_manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
+ Signature.Sign(this, metadata);
+ tasks.Add(Manager.QueryLoadAssignment(AccessPoint.TargetUrl(), metadata, request, TimeSpan.FromSeconds(3)));
}
- List<rmq.Assignment>[] list = await Task.WhenAll(tasks);
-
+ var list = await Task.WhenAll(tasks);
var i = 0;
foreach (var assignments in list)
{
@@ -211,9 +213,9 @@ namespace Org.Apache.Rocketmq
var targetUrl = Utilities.TargetUrl(messageQueue);
var metadata = new Metadata();
- Signature.sign(this, metadata);
+ Signature.Sign(this, metadata);
- return await _manager.ReceiveMessage(targetUrl, metadata, request,
+ return await Manager.ReceiveMessage(targetUrl, metadata, request,
ClientSettings.Subscription.LongPollingTimeout.ToTimeSpan());
}
@@ -241,8 +243,8 @@ namespace Org.Apache.Rocketmq
var targetUrl = message._sourceHost;
var metadata = new Metadata();
- Signature.sign(this, metadata);
- await _manager.Ack(targetUrl, metadata, request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ await Manager.Ack(targetUrl, metadata, request, RequestTimeout);
}
public async Task ChangeInvisibleDuration(Message message, TimeSpan invisibleDuration)
@@ -266,8 +268,8 @@ namespace Org.Apache.Rocketmq
var targetUrl = message._sourceHost;
var metadata = new Metadata();
- Signature.sign(this, metadata);
- await _manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
+ Signature.Sign(this, metadata);
+ await Manager.ChangeInvisibleDuration(targetUrl, metadata, request, RequestTimeout);
}
private rmq.MessageQueue NextQueue()
diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs
index af5983c..e12c027 100644
--- a/csharp/tests/ClientManagerTest.cs
+++ b/csharp/tests/ClientManagerTest.cs
@@ -48,7 +48,7 @@ namespace Org.Apache.Rocketmq
clientConfig.CredentialsProvider = credentialsProvider;
clientConfig.ResourceNamespace = resourceNamespace;
clientConfig.Region = "cn-hangzhou-pre";
- Signature.sign(clientConfig, metadata);
+ Signature.Sign(clientConfig, metadata);
var clientManager = new ClientManager();
string target = "https://116.62.231.199:80";
var topicRouteData = clientManager.ResolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs
index b5094a2..bcf36a5 100644
--- a/csharp/tests/ProducerTest.cs
+++ b/csharp/tests/ProducerTest.cs
@@ -26,30 +26,14 @@ namespace tests
[TestClass]
public class ProducerTest
{
-
- private static AccessPoint _accessPoint;
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
- {
- _accessPoint = new AccessPoint
- {
- Host = HOST,
- Port = PORT
- };
- }
-
- [ClassCleanup]
- public static void TearDown()
- {
- }
-
[TestMethod]
public async Task TestLifecycle()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
await producer.Shutdown();
}
@@ -57,21 +41,26 @@ namespace tests
[TestMethod]
public async Task TestSendStandardMessage()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
-
- // Tag the massage. A message has at most one tag.
- msg.Tag = "Tag-0";
-
+ var msg = new Message(TOPIC, body)
+ {
+ // Tag the massage. A message has at most one tag.
+ Tag = "Tag-0"
+ };
+
// Associate the message with one or multiple keys
- var keys = new List<string>();
- keys.Add("k1");
- keys.Add("k2");
+ var keys = new List<string>
+ {
+ "k1",
+ "k2"
+ };
msg.Keys = keys;
var sendResult = await producer.Send(msg);
@@ -82,23 +71,28 @@ namespace tests
[TestMethod]
public async Task TestSendMultipleMessages()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
for (var i = 0; i < 128; i++)
{
- var msg = new Message(topic, body);
-
- // Tag the massage. A message has at most one tag.
- msg.Tag = "Tag-0";
-
+ var msg = new Message(TOPIC, body)
+ {
+ // Tag the massage. A message has at most one tag.
+ Tag = "Tag-0"
+ };
+
// Associate the message with one or multiple keys
- var keys = new List<string>();
- keys.Add("k1");
- keys.Add("k2");
+ var keys = new List<string>
+ {
+ "k1",
+ "k2"
+ };
msg.Keys = keys;
var sendResult = await producer.Send(msg);
Assert.IsNotNull(sendResult);
@@ -109,17 +103,20 @@ namespace tests
[TestMethod]
public async Task TestSendFifoMessage()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
-
- // Messages of the same group will get delivered one after another.
- msg.MessageGroup = "message-group-0";
-
+ var msg = new Message(TOPIC, body)
+ {
+ // Messages of the same group will get delivered one after another.
+ MessageGroup = "message-group-0"
+ };
+
// Verify messages are FIFO iff their message group is not null or empty.
Assert.IsTrue(msg.Fifo());
@@ -131,15 +128,19 @@ namespace tests
[TestMethod]
public async Task TestSendScheduledMessage()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
-
- msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ var msg = new Message(TOPIC, body)
+ {
+ DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+ };
+
Assert.IsTrue(msg.Scheduled());
var sendResult = await producer.Send(msg);
@@ -154,15 +155,19 @@ namespace tests
[TestMethod]
public async Task TestSendMessage_Failure()
{
- var producer = new Producer($"{HOST}:{PORT}");
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
+ var producer = new Producer($"{HOST}:{PORT}")
+ {
+ CredentialsProvider = new ConfigFileCredentialsProvider(),
+ Region = "cn-hangzhou-pre"
+ };
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
- msg.MessageGroup = "Group-0";
- msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ var msg = new Message(TOPIC, body)
+ {
+ MessageGroup = "Group-0",
+ DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10)
+ };
Assert.IsTrue(msg.Scheduled());
try
@@ -176,10 +181,9 @@ namespace tests
await producer.Shutdown();
}
- private static string topic = "cpp_sdk_standard";
-
- private static string HOST = "127.0.0.1";
- private static int PORT = 8081;
+ private const string TOPIC = "cpp_sdk_standard";
+ private const string HOST = "127.0.0.1";
+ private const int PORT = 8081;
}
}
\ No newline at end of file
diff --git a/csharp/tests/RpcClientTest.cs b/csharp/tests/RpcClientTest.cs
index a1ecf82..b438047 100644
--- a/csharp/tests/RpcClientTest.cs
+++ b/csharp/tests/RpcClientTest.cs
@@ -39,7 +39,7 @@ namespace Org.Apache.Rocketmq
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
- Signature.sign(client_config, metadata);
+ Signature.Sign(client_config, metadata);
var cmd = new rmq::TelemetryCommand();
cmd.Settings = new rmq::Settings();
@@ -107,7 +107,7 @@ namespace Org.Apache.Rocketmq
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
- Signature.sign(client_config, metadata);
+ Signature.Sign(client_config, metadata);
var request = new rmq::QueryRouteRequest();
request.Topic = new rmq::Resource();
request.Topic.Name = "cpp_sdk_standard";
@@ -128,7 +128,7 @@ namespace Org.Apache.Rocketmq
var rpc_client = new RpcClient(target);
var client_config = new ClientConfig();
var metadata = new grpc::Metadata();
- Signature.sign(client_config, metadata);
+ Signature.Sign(client_config, metadata);
var request = new rmq::SendMessageRequest();
var message = new rmq::Message();
diff --git a/csharp/tests/SignatureTest.cs b/csharp/tests/SignatureTest.cs
index 16d0f46..12d1c10 100644
--- a/csharp/tests/SignatureTest.cs
+++ b/csharp/tests/SignatureTest.cs
@@ -15,11 +15,11 @@
* limitations under the License.
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using grpc = global::Grpc.Core;
+using grpc = Grpc.Core;
using Moq;
-using System;
+using Org.Apache.Rocketmq;
-namespace Org.Apache.Rocketmq
+namespace tests
{
[TestClass]
@@ -27,11 +27,10 @@ namespace Org.Apache.Rocketmq
{
[TestMethod]
- public void testSign()
+ public void TestSign()
{
var mock = new Mock<IClientConfig>();
mock.Setup(x => x.getGroupName()).Returns("G1");
- mock.Setup(x => x.tenantId()).Returns("Tenant-id");
mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
mock.Setup(x => x.serviceName()).Returns("mq");
mock.Setup(x => x.region()).Returns("cn-hangzhou");
@@ -42,7 +41,7 @@ namespace Org.Apache.Rocketmq
mock.Setup(x => x.credentialsProvider()).Returns(credentialsProvider);
var metadata = new grpc::Metadata();
- Signature.sign(mock.Object, metadata);
+ Signature.Sign(mock.Object, metadata);
Assert.IsNotNull(metadata.Get(MetadataConstants.AUTHORIZATION));
}
}