You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/21 09:05:16 UTC
[rocketmq-client-csharp] 02/04: WIP: write unit tests
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
commit 4433efa4b1710dc0c3116d89f4f957c02c31bdc3
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 11:15:30 2022 +0800
WIP: write unit tests
---
examples/Program.cs | 15 ++--
rocketmq-client-csharp/AccessPoint.cs | 5 ++
rocketmq-client-csharp/Client.cs | 28 ++++++-
rocketmq-client-csharp/ClientConfig.cs | 66 ++++++++++------
rocketmq-client-csharp/ClientManager.cs | 2 +-
.../ConfigFileCredentialsProvider.cs | 30 ++++---
rocketmq-client-csharp/Credentials.cs | 32 +++++---
rocketmq-client-csharp/IClient.cs | 4 +-
rocketmq-client-csharp/IClientConfig.cs | 6 +-
rocketmq-client-csharp/IClientManager.cs | 6 +-
rocketmq-client-csharp/ICredentialsProvider.cs | 6 +-
rocketmq-client-csharp/IProducer.cs | 6 +-
rocketmq-client-csharp/Message.cs | 32 +++++---
rocketmq-client-csharp/MessageType.cs | 3 +-
rocketmq-client-csharp/MetadataConstants.cs | 6 +-
rocketmq-client-csharp/Producer.cs | 2 +-
rocketmq-client-csharp/PushConsumer.cs | 2 +-
rocketmq-client-csharp/RpcClient.cs | 3 +-
rocketmq-client-csharp/SendStatus.cs | 6 +-
rocketmq-client-csharp/Session.cs | 39 +++++----
rocketmq-client-csharp/Signature.cs | 26 +++---
rocketmq-client-csharp/SimpleConsumer.cs | 92 ++++++++++++++++++++++
.../StaticCredentialsProvider.cs | 12 ++-
tests/ClientConfigTest.cs | 9 ++-
tests/ClientManagerTest.cs | 11 ++-
tests/ConfigFileCredentialsProviderTest.cs | 9 ++-
tests/DateTimeTest.cs | 17 ++--
tests/MessageTest.cs | 21 +++--
tests/PushConsumerTest.cs | 10 ++-
tests/SendResultTest.cs | 14 ++--
tests/SignatureTest.cs | 8 +-
tests/{SendResultTest.cs => SimpleConsumerTest.cs} | 39 +++++----
tests/StaticCredentialsProviderTest.cs | 9 ++-
tests/TopicTest.cs | 23 +++---
tests/UnitTest1.cs | 46 +++++------
35 files changed, 447 insertions(+), 198 deletions(-)
diff --git a/examples/Program.cs b/examples/Program.cs
index d96f41e..09a1674 100644
--- a/examples/Program.cs
+++ b/examples/Program.cs
@@ -5,20 +5,24 @@ using System.Threading;
namespace examples
{
- class Foo {
+ class Foo
+ {
public int bar = 1;
}
class Program
{
- static void RT(Action action, int seconds, CancellationToken token) {
- if (null == action) {
+ static void RT(Action action, int seconds, CancellationToken token)
+ {
+ if (null == action)
+ {
return;
}
Task.Run(async () =>
{
- while(!token.IsCancellationRequested) {
+ while (!token.IsCancellationRequested)
+ {
action();
await Task.Delay(TimeSpan.FromSeconds(seconds), token);
}
@@ -44,7 +48,8 @@ namespace examples
ThreadPool.QueueUserWorkItem((Object stateInfo) =>
{
Console.WriteLine("From ThreadPool");
- if (stateInfo is Foo) {
+ if (stateInfo is Foo)
+ {
Console.WriteLine("Foo: bar=" + (stateInfo as Foo).bar);
}
}, new Foo());
diff --git a/rocketmq-client-csharp/AccessPoint.cs b/rocketmq-client-csharp/AccessPoint.cs
index f97d216..cf4e1f4 100644
--- a/rocketmq-client-csharp/AccessPoint.cs
+++ b/rocketmq-client-csharp/AccessPoint.cs
@@ -33,5 +33,10 @@ namespace Org.Apache.Rocketmq
get { return _port; }
set { _port = value; }
}
+
+ public string TargetUrl()
+ {
+ return $"https://{_host}:{_port}";
+ }
}
}
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 1d32095..1eb368e 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -33,15 +33,31 @@ namespace Org.Apache.Rocketmq
protected Client(AccessPoint accessPoint, string resourceNamespace)
{
+ _accessPoint = accessPoint;
+
// Support IPv4 for now
AccessPointScheme = rmq::AddressScheme.Ipv4;
-
var serviceEndpoint = new rmq::Address();
serviceEndpoint.Host = accessPoint.Host;
serviceEndpoint.Port = accessPoint.Port;
AccessPointEndpoints = new List<rmq::Address> { serviceEndpoint };
_resourceNamespace = resourceNamespace;
+
+ _clientSettings = new rmq::Settings();
+
+ _clientSettings.AccessPoint = new rmq::Endpoints();
+ _clientSettings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+ _clientSettings.AccessPoint.Addresses.Add(serviceEndpoint);
+
+ _clientSettings.RequestTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(TimeSpan.FromSeconds(3));
+
+ _clientSettings.UserAgent = new rmq.UA();
+ _clientSettings.UserAgent.Language = rmq::Language.DotNet;
+ _clientSettings.UserAgent.Version = "5.0.0";
+ _clientSettings.UserAgent.Platform = Environment.OSVersion.ToString();
+ _clientSettings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+
Manager = ClientManagerFactory.getClientManager(resourceNamespace);
_topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
@@ -283,9 +299,9 @@ namespace Org.Apache.Rocketmq
return $"https://{address.Host}:{address.Port}";
}
- public void buildClientSetting(rmq::Settings settings)
+ public virtual void BuildClientSetting(rmq::Settings settings)
{
-
+ settings.MergeFrom(_clientSettings);
}
public void createSession(string url)
@@ -388,6 +404,12 @@ namespace Org.Apache.Rocketmq
private readonly CancellationTokenSource _updateTopicRouteCts;
private readonly CancellationTokenSource _healthCheckCts;
+
+ protected readonly AccessPoint _accessPoint;
+
+ // This field is subject changes from servers.
+ protected rmq::Settings _clientSettings;
+
private Random random = new Random();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 6dc3eba..0d99cb1 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -18,11 +18,14 @@ using System;
using System.Collections.Generic;
using rmq = Apache.Rocketmq.V2;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
- public class ClientConfig : IClientConfig {
+ public class ClientConfig : IClientConfig
+ {
- public ClientConfig() {
+ public ClientConfig()
+ {
var hostName = System.Net.Dns.GetHostName();
var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
@@ -34,39 +37,50 @@ namespace Org.Apache.Rocketmq {
this._publishing = new Publishing();
}
- public string region() {
+ public string region()
+ {
return _region;
}
- public string Region {
+ public string Region
+ {
set { _region = value; }
}
- public string serviceName() {
+ public string serviceName()
+ {
return _serviceName;
}
- public string ServiceName {
+ public string ServiceName
+ {
set { _serviceName = value; }
}
- public string resourceNamespace() {
+ public string resourceNamespace()
+ {
return _resourceNamespace;
}
- public string ResourceNamespace {
+ public string ResourceNamespace
+ {
+ get { return _resourceNamespace; }
set { _resourceNamespace = value; }
}
- public ICredentialsProvider credentialsProvider() {
+ public ICredentialsProvider credentialsProvider()
+ {
return credentialsProvider_;
}
-
- public ICredentialsProvider CredentialsProvider {
+
+ public ICredentialsProvider CredentialsProvider
+ {
set { credentialsProvider_ = value; }
}
- public string tenantId() {
+ public string tenantId()
+ {
return _tenantId;
}
- public string TenantId {
+ public string TenantId
+ {
set { _tenantId = value; }
}
@@ -82,32 +96,40 @@ namespace Org.Apache.Rocketmq {
}
}
- public TimeSpan getLongPollingTimeout() {
+ public TimeSpan getLongPollingTimeout()
+ {
return longPollingIoTimeout_;
}
- public TimeSpan LongPollingTimeout {
+ public TimeSpan LongPollingTimeout
+ {
set { longPollingIoTimeout_ = value; }
}
- public string getGroupName() {
+ public string getGroupName()
+ {
return groupName_;
}
- public string GroupName {
+ public string GroupName
+ {
set { groupName_ = value; }
}
- public string clientId() {
+ public string clientId()
+ {
return clientId_;
}
- public bool isTracingEnabled() {
+ public bool isTracingEnabled()
+ {
return tracingEnabled_;
}
- public bool TracingEnabled {
+ public bool TracingEnabled
+ {
set { tracingEnabled_ = value; }
}
- public void setInstanceName(string instanceName) {
+ public void setInstanceName(string instanceName)
+ {
this.instanceName_ = instanceName;
}
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index a0b377b..54ceff2 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -152,7 +152,7 @@ namespace Org.Apache.Rocketmq
// TODO:
List<Message> messages = new List<Message>();
-
+
return messages;
}
diff --git a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
index 9a3baa3..39dfd7e 100644
--- a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
+++ b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -19,36 +19,46 @@ using System;
using System.Text.Json;
using System.Collections.Generic;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
/**
* File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config
* A sample config content is as follows:
* {"AccessKey": "key", "AccessSecret": "secret"}
*/
- public class ConfigFileCredentialsProvider : ICredentialsProvider {
+ public class ConfigFileCredentialsProvider : ICredentialsProvider
+ {
- public ConfigFileCredentialsProvider() {
+ public ConfigFileCredentialsProvider()
+ {
var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
string configFileRelativePath = "/.rocketmq/config";
- if (!File.Exists(home + configFileRelativePath)) {
+ if (!File.Exists(home + configFileRelativePath))
+ {
return;
}
- try {
- using (var reader = new StreamReader(home + configFileRelativePath)) {
+ try
+ {
+ using (var reader = new StreamReader(home + configFileRelativePath))
+ {
string json = reader.ReadToEnd();
var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json);
accessKey = kv["AccessKey"];
accessSecret = kv["AccessSecret"];
valid = true;
- }
- } catch (IOException e) {
+ }
+ }
+ catch (IOException)
+ {
}
}
- public Credentials getCredentials() {
- if (!valid) {
+ public Credentials getCredentials()
+ {
+ if (!valid)
+ {
return null;
}
diff --git a/rocketmq-client-csharp/Credentials.cs b/rocketmq-client-csharp/Credentials.cs
index 2ccafc8..a73b000 100644
--- a/rocketmq-client-csharp/Credentials.cs
+++ b/rocketmq-client-csharp/Credentials.cs
@@ -17,27 +17,34 @@
using System;
-namespace Org.Apache.Rocketmq {
- public class Credentials {
+namespace Org.Apache.Rocketmq
+{
+ public class Credentials
+ {
- public Credentials(string accessKey, string accessSecret) {
+ public Credentials(string accessKey, string accessSecret)
+ {
this.accessKey = accessKey;
this.accessSecret = accessSecret;
}
- public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant) {
+ public Credentials(string accessKey, string accessSecret, string sessionToken, DateTime expirationInstant)
+ {
this.accessKey = accessKey;
this.accessSecret = accessSecret;
this.sessionToken = sessionToken;
this.expirationInstant = expirationInstant;
}
- public bool empty() {
+ public bool empty()
+ {
return String.IsNullOrEmpty(accessKey) || String.IsNullOrEmpty(accessSecret);
}
- public bool expired() {
- if (DateTime.MinValue == expirationInstant) {
+ public bool expired()
+ {
+ if (DateTime.MinValue == expirationInstant)
+ {
return false;
}
@@ -45,17 +52,20 @@ namespace Org.Apache.Rocketmq {
}
private string accessKey;
- public string AccessKey {
+ public string AccessKey
+ {
get { return accessKey; }
}
-
+
private string accessSecret;
- public string AccessSecret {
+ public string AccessSecret
+ {
get { return accessSecret; }
}
private string sessionToken;
- public string SessionToken {
+ public string SessionToken
+ {
get { return sessionToken; }
}
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index f4115a2..abdcc21 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -28,6 +28,8 @@ namespace Org.Apache.Rocketmq
Task<bool> NotifyClientTermination();
- void buildClientSetting(rmq::Settings settings);
+ void BuildClientSetting(rmq::Settings settings);
+
+
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientConfig.cs b/rocketmq-client-csharp/IClientConfig.cs
index 3726ac4..438d7a8 100644
--- a/rocketmq-client-csharp/IClientConfig.cs
+++ b/rocketmq-client-csharp/IClientConfig.cs
@@ -16,8 +16,10 @@
*/
using System;
-namespace Org.Apache.Rocketmq {
- public interface IClientConfig {
+namespace Org.Apache.Rocketmq
+{
+ public interface IClientConfig
+ {
string region();
string serviceName();
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index d5c3ea3..afccfde 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -22,8 +22,10 @@ using grpc = global::Grpc.Core;
using rmq = Apache.Rocketmq.V2;
-namespace Org.Apache.Rocketmq {
- public interface IClientManager {
+namespace Org.Apache.Rocketmq
+{
+ public interface IClientManager
+ {
IRpcClient GetRpcClient(string target);
grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(string target, grpc::Metadata metadata);
diff --git a/rocketmq-client-csharp/ICredentialsProvider.cs b/rocketmq-client-csharp/ICredentialsProvider.cs
index 80e908f..1fb892b 100644
--- a/rocketmq-client-csharp/ICredentialsProvider.cs
+++ b/rocketmq-client-csharp/ICredentialsProvider.cs
@@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-namespace Org.Apache.Rocketmq {
- public interface ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+ public interface ICredentialsProvider
+ {
Credentials getCredentials();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index cbb82d4..9c30c6c 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -17,8 +17,10 @@
using System.Threading.Tasks;
-namespace Org.Apache.Rocketmq {
- public interface IProducer {
+namespace Org.Apache.Rocketmq
+{
+ public interface IProducer
+ {
void Start();
void Shutdown();
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 5cbf1aa..b8b0e98 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -19,16 +19,20 @@ using System.Collections.Generic;
namespace Org.Apache.Rocketmq
{
- public class Message {
- public Message() : this(null, null) {
+ public class Message
+ {
+ public Message() : this(null, null)
+ {
}
- public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {}
+ public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { }
- public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) {
+ public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body)
+ {
}
- public Message(string topic, string tag, List<string> keys, byte[] body) {
+ public Message(string topic, string tag, List<string> keys, byte[] body)
+ {
this.messageId = SequenceGenerator.Instance.Next();
this.maxAttemptTimes = 3;
this.topic = topic;
@@ -58,37 +62,43 @@ namespace Org.Apache.Rocketmq
private string topic;
- public string Topic {
+ public string Topic
+ {
get { return topic; }
set { this.topic = value; }
}
private byte[] body;
- public byte[] Body {
+ public byte[] Body
+ {
get { return body; }
set { this.body = value; }
}
private string tag;
- public string Tag {
+ public string Tag
+ {
get { return tag; }
set { this.tag = value; }
}
private List<string> keys;
- public List<string> Keys{
+ public List<string> Keys
+ {
get { return keys; }
set { this.keys = value; }
}
private Dictionary<string, string> userProperties;
- public Dictionary<string, string> UserProperties {
+ public Dictionary<string, string> UserProperties
+ {
get { return userProperties; }
set { this.userProperties = value; }
}
private Dictionary<string, string> systemProperties;
- internal Dictionary<string, string> SystemProperties {
+ internal Dictionary<string, string> SystemProperties
+ {
get { return systemProperties; }
set { this.systemProperties = value; }
}
diff --git a/rocketmq-client-csharp/MessageType.cs b/rocketmq-client-csharp/MessageType.cs
index 8373496..a459e93 100644
--- a/rocketmq-client-csharp/MessageType.cs
+++ b/rocketmq-client-csharp/MessageType.cs
@@ -17,7 +17,8 @@
namespace Org.Apache.Rocketmq
{
- public enum MessageType {
+ public enum MessageType
+ {
Normal,
Fifo,
Delay,
diff --git a/rocketmq-client-csharp/MetadataConstants.cs b/rocketmq-client-csharp/MetadataConstants.cs
index 33bd66e..5381595 100644
--- a/rocketmq-client-csharp/MetadataConstants.cs
+++ b/rocketmq-client-csharp/MetadataConstants.cs
@@ -17,8 +17,10 @@
using System;
-namespace Org.Apache.Rocketmq {
- public class MetadataConstants {
+namespace Org.Apache.Rocketmq
+{
+ public class MetadataConstants
+ {
public const string TENANT_ID_KEY = "x-mq-tenant-id";
public const string NAMESPACE_KEY = "x-mq-namespace";
public const string AUTHORIZATION = "authorization";
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index ccddc8c..bfcc1d3 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -139,6 +139,6 @@ namespace Org.Apache.Rocketmq
}
private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
- private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+ private static new readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index def9be4..909e7a2 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -201,7 +201,7 @@ namespace Org.Apache.Rocketmq
}
}
}
- catch (System.Exception e)
+ catch (System.Exception)
{
// TODO: log exception raised.
}
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index f56a07f..e0a2caf 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -125,7 +125,8 @@ namespace Org.Apache.Rocketmq
var callOptions = new CallOptions(metadata, deadline);
var call = _stub.ReceiveMessage(request, callOptions);
var result = new List<rmq::ReceiveMessageResponse>();
- while(await call.ResponseStream.MoveNext()) {
+ while (await call.ResponseStream.MoveNext())
+ {
result.Add(call.ResponseStream.Current);
}
return result;
diff --git a/rocketmq-client-csharp/SendStatus.cs b/rocketmq-client-csharp/SendStatus.cs
index b20e1c5..7586d22 100644
--- a/rocketmq-client-csharp/SendStatus.cs
+++ b/rocketmq-client-csharp/SendStatus.cs
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-namespace Org.Apache.Rocketmq {
- public enum SendStatus {
+namespace Org.Apache.Rocketmq
+{
+ public enum SendStatus
+ {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index dc13dc9..3e234f2 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -33,6 +33,7 @@ namespace Org.Apache.Rocketmq
{
this._target = target;
this._stream = stream;
+ this._client = client;
}
public async Task Loop()
@@ -41,36 +42,40 @@ namespace Org.Apache.Rocketmq
var writer = this._stream.RequestStream;
var request = new rmq::TelemetryCommand();
request.Settings = new rmq::Settings();
- _client.buildClientSetting(request.Settings);
+ _client.BuildClientSetting(request.Settings);
await writer.WriteAsync(request);
+ Logger.Debug($"Writing Client Settings Done: {request.Settings.ToString()}");
while (!_cts.IsCancellationRequested)
{
if (await reader.MoveNext(_cts.Token))
{
var cmd = reader.Current;
+ Logger.Debug($"Received a TelemetryCommand: {cmd.ToString()}");
switch (cmd.CommandCase)
{
case rmq::TelemetryCommand.CommandOneofCase.None:
- {
- Logger.Warn($"Telemetry failed: {cmd.Status}");
- break;
- }
+ {
+ Logger.Warn($"Telemetry failed: {cmd.Status}");
+ break;
+ }
case rmq::TelemetryCommand.CommandOneofCase.Settings:
- {
- break;
- }
+ {
+
+ Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
+ break;
+ }
case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
- {
- break;
- }
+ {
+ break;
+ }
case rmq::TelemetryCommand.CommandOneofCase.RecoverOrphanedTransactionCommand:
- {
- break;
- }
+ {
+ break;
+ }
case rmq::TelemetryCommand.CommandOneofCase.VerifyMessageCommand:
- {
- break;
- }
+ {
+ break;
+ }
}
}
}
diff --git a/rocketmq-client-csharp/Signature.cs b/rocketmq-client-csharp/Signature.cs
index c249253..2331b53 100644
--- a/rocketmq-client-csharp/Signature.cs
+++ b/rocketmq-client-csharp/Signature.cs
@@ -19,30 +19,38 @@ using System.Text;
using grpc = global::Grpc.Core;
using System.Security.Cryptography;
-namespace Org.Apache.Rocketmq {
- public class Signature {
- public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) {
+namespace Org.Apache.Rocketmq
+{
+ public class Signature
+ {
+ public static void sign(IClientConfig clientConfig, grpc::Metadata metadata)
+ {
metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
metadata.Add(MetadataConstants.CLIENT_ID_KEY, clientConfig.clientId());
- if (!String.IsNullOrEmpty(clientConfig.tenantId())) {
+ if (!String.IsNullOrEmpty(clientConfig.tenantId()))
+ {
metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
}
- if (!String.IsNullOrEmpty(clientConfig.resourceNamespace())) {
+ if (!String.IsNullOrEmpty(clientConfig.resourceNamespace()))
+ {
metadata.Add(MetadataConstants.NAMESPACE_KEY, clientConfig.resourceNamespace());
}
string time = DateTime.Now.ToString(MetadataConstants.DATE_TIME_FORMAT);
metadata.Add(MetadataConstants.DATE_TIME_KEY, time);
- if (null != clientConfig.credentialsProvider()) {
+ if (null != clientConfig.credentialsProvider())
+ {
var credentials = clientConfig.credentialsProvider().getCredentials();
- if (null == credentials || credentials.expired()) {
+ if (null == credentials || credentials.expired())
+ {
return;
}
- if (!String.IsNullOrEmpty(credentials.SessionToken)) {
+ if (!String.IsNullOrEmpty(credentials.SessionToken))
+ {
metadata.Add(MetadataConstants.STS_SESSION_TOKEN, credentials.SessionToken);
}
@@ -51,7 +59,7 @@ namespace Org.Apache.Rocketmq {
HMACSHA1 signer = new HMACSHA1(secretData);
byte[] digest = signer.ComputeHash(data);
string hmac = BitConverter.ToString(digest).Replace("-", "");
- string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}",
+ string authorization = string.Format("{0} {1}={2}/{3}/{4}, {5}={6}, {7}={8}",
MetadataConstants.ALGORITHM_KEY,
MetadataConstants.CREDENTIAL_KEY,
credentials.AccessKey,
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
new file mode 100644
index 0000000..4c447c9
--- /dev/null
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using rmq = Apache.Rocketmq.V2;
+using NLog;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace Org.Apache.Rocketmq
+{
+ public class SimpleConsumer : Client
+ {
+
+ public SimpleConsumer(AccessPoint accessPoint,
+ string resourceNamespace, string group)
+ : base(accessPoint, resourceNamespace)
+ {
+ fifo_ = false;
+ subscriptions_ = new ConcurrentDictionary<string, rmq.SubscriptionEntry>();
+ group_ = group;
+ }
+
+ public override void BuildClientSetting(rmq::Settings settings)
+ {
+ base.BuildClientSetting(settings);
+
+ settings.ClientType = rmq::ClientType.SimpleConsumer;
+ settings.Subscription = new rmq::Subscription();
+ settings.Subscription.Group = new rmq::Resource();
+ settings.Subscription.Group.Name = Group;
+ settings.Subscription.Group.ResourceNamespace = ResourceNamespace;
+
+ foreach (var entry in subscriptions_)
+ {
+ settings.Subscription.Subscriptions.Add(entry.Value);
+ }
+ }
+
+ public override void Start()
+ {
+ base.Start();
+ base.createSession(_accessPoint.TargetUrl());
+ }
+
+ public override void Shutdown()
+ {
+ base.Shutdown();
+ }
+
+ protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
+ {
+ }
+
+ public void Subscribe(string topic, rmq::FilterType filterType, string expression)
+ {
+ var entry = new rmq::SubscriptionEntry();
+ entry.Topic = new rmq::Resource();
+ entry.Topic.Name = topic;
+ entry.Topic.ResourceNamespace = ResourceNamespace;
+ entry.Expression = new rmq::FilterExpression();
+ entry.Expression.Type = filterType;
+ entry.Expression.Expression = expression;
+ subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
+ }
+
+ private string group_;
+
+ public string Group
+ {
+ get { return group_; }
+ }
+
+ private bool fifo_;
+
+ private ConcurrentDictionary<string, rmq::SubscriptionEntry> subscriptions_;
+
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticCredentialsProvider.cs b/rocketmq-client-csharp/StaticCredentialsProvider.cs
index d00dba6..edd810d 100644
--- a/rocketmq-client-csharp/StaticCredentialsProvider.cs
+++ b/rocketmq-client-csharp/StaticCredentialsProvider.cs
@@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-namespace Org.Apache.Rocketmq {
- public class StaticCredentialsProvider : ICredentialsProvider {
+namespace Org.Apache.Rocketmq
+{
+ public class StaticCredentialsProvider : ICredentialsProvider
+ {
- public StaticCredentialsProvider(string accessKey, string accessSecret) {
+ public StaticCredentialsProvider(string accessKey, string accessSecret)
+ {
this.accessKey = accessKey;
this.accessSecret = accessSecret;
}
- public Credentials getCredentials() {
+ public Credentials getCredentials()
+ {
return new Credentials(accessKey, accessSecret);
}
diff --git a/tests/ClientConfigTest.cs b/tests/ClientConfigTest.cs
index 427d1d2..4d8dec1 100644
--- a/tests/ClientConfigTest.cs
+++ b/tests/ClientConfigTest.cs
@@ -17,11 +17,14 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class ClientConfigTest {
+ public class ClientConfigTest
+ {
[TestMethod]
- public void testClientId() {
+ public void testClientId()
+ {
var clientConfig = new ClientConfig();
string clientId = clientConfig.clientId();
Assert.IsTrue(clientId.Contains("@"));
diff --git a/tests/ClientManagerTest.cs b/tests/ClientManagerTest.cs
index 850db63..af5983c 100644
--- a/tests/ClientManagerTest.cs
+++ b/tests/ClientManagerTest.cs
@@ -19,13 +19,16 @@ using Grpc.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using rmq = Apache.Rocketmq.V2;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class ClientManagerTest {
-
+ public class ClientManagerTest
+ {
+
[TestMethod]
- public void TestResolveRoute() {
+ public void TestResolveRoute()
+ {
string topic = "cpp_sdk_standard";
string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
var request = new rmq::QueryRouteRequest();
diff --git a/tests/ConfigFileCredentialsProviderTest.cs b/tests/ConfigFileCredentialsProviderTest.cs
index 0d46b98..7741295 100644
--- a/tests/ConfigFileCredentialsProviderTest.cs
+++ b/tests/ConfigFileCredentialsProviderTest.cs
@@ -18,11 +18,14 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class ConfigFileCredentialsProviderTest {
+ public class ConfigFileCredentialsProviderTest
+ {
[TestMethod]
- public void testGetCredentials() {
+ public void testGetCredentials()
+ {
var provider = new ConfigFileCredentialsProvider();
var credentials = provider.getCredentials();
Assert.IsNotNull(credentials);
diff --git a/tests/DateTimeTest.cs b/tests/DateTimeTest.cs
index 0d9a2a7..fdf7d53 100644
--- a/tests/DateTimeTest.cs
+++ b/tests/DateTimeTest.cs
@@ -14,18 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
-namespace Org.Apache.Rocketmq {
-
+namespace Org.Apache.Rocketmq
+{
+
[TestClass]
- public class DateTimeTest {
-
+ public class DateTimeTest
+ {
+
[TestMethod]
- public void testFormat() {
+ public void testFormat()
+ {
DateTime instant = new DateTime(2022, 02, 15, 08, 31, 56);
- string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
+ string time = instant.ToString(MetadataConstants.DATE_TIME_FORMAT);
string expected = "20220215T083156Z";
Assert.AreEqual(time, expected);
}
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
index 2de9f54..f1c71f8 100644
--- a/tests/MessageTest.cs
+++ b/tests/MessageTest.cs
@@ -19,12 +19,15 @@ using System;
using System.Text;
using System.Collections.Generic;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class MessageTest {
+ public class MessageTest
+ {
[TestMethod]
- public void testCtor() {
+ public void testCtor()
+ {
var msg1 = new Message();
Assert.IsNotNull(msg1.MessageId);
Assert.IsTrue(msg1.MessageId.StartsWith("01"));
@@ -36,7 +39,8 @@ namespace Org.Apache.Rocketmq {
}
[TestMethod]
- public void testCtor2() {
+ public void testCtor2()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -49,7 +53,8 @@ namespace Org.Apache.Rocketmq {
}
[TestMethod]
- public void testCtor3() {
+ public void testCtor3()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -63,7 +68,8 @@ namespace Org.Apache.Rocketmq {
}
[TestMethod]
- public void testCtor4() {
+ public void testCtor4()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
@@ -81,7 +87,8 @@ namespace Org.Apache.Rocketmq {
}
[TestMethod]
- public void testCtor5() {
+ public void testCtor5()
+ {
string topic = "T1";
string bodyString = "body";
byte[] body = Encoding.ASCII.GetBytes(bodyString);
diff --git a/tests/PushConsumerTest.cs b/tests/PushConsumerTest.cs
index 444530b..78f01de 100644
--- a/tests/PushConsumerTest.cs
+++ b/tests/PushConsumerTest.cs
@@ -24,26 +24,28 @@ namespace Org.Apache.Rocketmq
public class TestMessageListener : IMessageListener
{
- public async Task Consume(List<Message> messages, List<Message> failed)
+ public Task Consume(List<Message> messages, List<Message> failed)
{
foreach (var message in messages)
{
Console.WriteLine("");
}
- }
+ return Task.CompletedTask;
+ }
}
public class CountableMessageListener : IMessageListener
{
- public async Task Consume(List<Message> messages, List<Message> failed)
+ public Task Consume(List<Message> messages, List<Message> failed)
{
foreach (var message in messages)
{
Console.WriteLine("{}", message.MessageId);
}
- }
+ return Task.CompletedTask;
+ }
}
[TestClass]
diff --git a/tests/SendResultTest.cs b/tests/SendResultTest.cs
index 475cf6d..4e3d9a0 100644
--- a/tests/SendResultTest.cs
+++ b/tests/SendResultTest.cs
@@ -17,13 +17,16 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class SendResultTest {
+ public class SendResultTest
+ {
[TestMethod]
- public void testCtor() {
+ public void testCtor()
+ {
string messageId = new string("abc");
var sendResult = new SendReceipt(messageId);
Assert.AreEqual(messageId, sendResult.MessageId);
@@ -32,7 +35,8 @@ namespace Org.Apache.Rocketmq {
[TestMethod]
- public void testCtor2() {
+ public void testCtor2()
+ {
string messageId = new string("abc");
var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
Assert.AreEqual(messageId, sendResult.MessageId);
@@ -40,5 +44,5 @@ namespace Org.Apache.Rocketmq {
}
}
-
+
}
\ No newline at end of file
diff --git a/tests/SignatureTest.cs b/tests/SignatureTest.cs
index fd6b525..16d0f46 100644
--- a/tests/SignatureTest.cs
+++ b/tests/SignatureTest.cs
@@ -19,10 +19,12 @@ using grpc = global::Grpc.Core;
using Moq;
using System;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class SignatureTest {
+ public class SignatureTest
+ {
[TestMethod]
public void testSign()
@@ -33,7 +35,7 @@ namespace Org.Apache.Rocketmq {
mock.Setup(x => x.resourceNamespace()).Returns("mq:arn:test:");
mock.Setup(x => x.serviceName()).Returns("mq");
mock.Setup(x => x.region()).Returns("cn-hangzhou");
-
+
string accessKey = "key";
string accessSecret = "secret";
var credentialsProvider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/SendResultTest.cs b/tests/SimpleConsumerTest.cs
similarity index 55%
copy from tests/SendResultTest.cs
copy to tests/SimpleConsumerTest.cs
index 475cf6d..1bc1a45 100644
--- a/tests/SendResultTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -14,31 +14,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+using rmq = Apache.Rocketmq.V2;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class SendResultTest {
+ public class SimpleConsumerTest
+ {
[TestMethod]
- public void testCtor() {
- string messageId = new string("abc");
- var sendResult = new SendReceipt(messageId);
- Assert.AreEqual(messageId, sendResult.MessageId);
- Assert.AreEqual(SendStatus.SEND_OK, sendResult.Status);
- }
-
+ public void TestStart()
+ {
+ var accessPoint = new AccessPoint();
+ var host = "11.166.42.94";
+ var port = 8081;
+ accessPoint.Host = host;
+ accessPoint.Port = port;
+ var resourceNamespace = "";
+ var group = "GID_cpp_sdk_standard";
+ var topic = "cpp_sdk_standard";
- [TestMethod]
- public void testCtor2() {
- string messageId = new string("abc");
- var sendResult = new SendReceipt(messageId, SendStatus.FLUSH_DISK_TIMEOUT);
- Assert.AreEqual(messageId, sendResult.MessageId);
- Assert.AreEqual(SendStatus.FLUSH_DISK_TIMEOUT, sendResult.Status);
+ var simpleConsumer = new SimpleConsumer(accessPoint, resourceNamespace, group);
+ simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
+ simpleConsumer.Start();
+ Thread.Sleep(10_000);
}
}
-
+
+
}
\ No newline at end of file
diff --git a/tests/StaticCredentialsProviderTest.cs b/tests/StaticCredentialsProviderTest.cs
index 20b9450..8b5f012 100644
--- a/tests/StaticCredentialsProviderTest.cs
+++ b/tests/StaticCredentialsProviderTest.cs
@@ -17,12 +17,15 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
-namespace Org.Apache.Rocketmq {
+namespace Org.Apache.Rocketmq
+{
[TestClass]
- public class StaticCredentialsProviderTest {
+ public class StaticCredentialsProviderTest
+ {
[TestMethod]
- public void testGetCredentials() {
+ public void testGetCredentials()
+ {
var accessKey = "key";
var accessSecret = "secret";
var provider = new StaticCredentialsProvider(accessKey, accessSecret);
diff --git a/tests/TopicTest.cs b/tests/TopicTest.cs
index 7d9f3f4..9f386de 100644
--- a/tests/TopicTest.cs
+++ b/tests/TopicTest.cs
@@ -17,13 +17,16 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Collections.Generic;
-namespace Org.Apache.Rocketmq {
-
- [TestClass]
- public class TopicTest {
+namespace Org.Apache.Rocketmq
+{
- [TestMethod]
- public void testCompareTo() {
+ [TestClass]
+ public class TopicTest
+ {
+
+ [TestMethod]
+ public void testCompareTo()
+ {
List<Topic> topics = new List<Topic>();
topics.Add(new Topic("ns1", "t1"));
topics.Add(new Topic("ns0", "t1"));
@@ -36,13 +39,13 @@ namespace Org.Apache.Rocketmq {
Assert.AreEqual(topics[1].ResourceNamespace, "ns0");
Assert.AreEqual(topics[1].Name, "t1");
-
+
Assert.AreEqual(topics[2].ResourceNamespace, "ns1");
Assert.AreEqual(topics[2].Name, "t1");
-
+
}
- }
- }
\ No newline at end of file
+ }
+}
\ No newline at end of file
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index c0b9357..bbf537a 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -16,36 +16,38 @@ namespace tests
public void TestMethod1()
{
rmq::Permission perm = rmq::Permission.None;
- switch(perm) {
- case rmq::Permission.None:
- {
- Console.WriteLine("None");
- break;
- }
+ switch (perm)
+ {
+ case rmq::Permission.None:
+ {
+ Console.WriteLine("None");
+ break;
+ }
- case rmq::Permission.Read:
- {
- Console.WriteLine("Read");
- break;
- }
+ case rmq::Permission.Read:
+ {
+ Console.WriteLine("Read");
+ break;
+ }
- case rmq::Permission.Write:
- {
- Console.WriteLine("Write");
- break;
- }
+ case rmq::Permission.Write:
+ {
+ Console.WriteLine("Write");
+ break;
+ }
- case rmq::Permission.ReadWrite:
- {
- Console.WriteLine("ReadWrite");
- break;
- }
+ case rmq::Permission.ReadWrite:
+ {
+ Console.WriteLine("ReadWrite");
+ break;
+ }
}
}
[TestMethod]
- public void TestRpcClientImplCtor() {
+ public void TestRpcClientImplCtor()
+ {
RpcClient impl = new RpcClient("https://localhost:5001");
}