You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/02/18 07:05:45 UTC
[rocketmq-client-csharp] branch develop updated: Implement and Test RPC QueryRoute, Heartbeat and Implement Producer basic logic (#2)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new cbe22d2 Implement and Test RPC QueryRoute, Heartbeat and Implement Producer basic logic (#2)
cbe22d2 is described below
commit cbe22d2546118b1282bd0fe98ba82fe88323e2b2
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Fri Feb 18 15:05:05 2022 +0800
Implement and Test RPC QueryRoute, Heartbeat and Implement Producer basic logic (#2)
---
README.md | 2 +-
rocketmq-client-csharp/Broker.cs | 9 +
rocketmq-client-csharp/Client.cs | 261 +++++++++++++++++++++
rocketmq-client-csharp/ClientLoggerInterceptor.cs | 134 +++++++++++
rocketmq-client-csharp/ClientManager.cs | 88 ++++++-
.../{IRpcClient.cs => ClientManagerFactory.cs} | 74 +++---
.../ConfigFileCredentialsProvider.cs | 63 +++++
.../{IRpcClient.cs => IClient.cs} | 61 ++---
rocketmq-client-csharp/IClientManager.cs | 6 +
.../{IRpcClient.cs => INameServerResolver.cs} | 56 +++--
.../{IClientManager.cs => IProducer.cs} | 13 +-
rocketmq-client-csharp/IRpcClient.cs | 7 +-
rocketmq-client-csharp/Message.cs | 85 +++++++
.../{IRpcClient.cs => MessageType.cs} | 57 +++--
rocketmq-client-csharp/Producer.cs | 135 +++++++++++
rocketmq-client-csharp/PublishLoadBalancer.cs | 119 ++++++++++
rocketmq-client-csharp/RpcClient.cs | 23 +-
rocketmq-client-csharp/Signature.cs | 2 +-
.../{IRpcClient.cs => StaticNameServerResolver.cs} | 67 +++---
.../{IRpcClient.cs => TopicRouteException.cs} | 57 +++--
tests/ClientManagerTest.cs | 57 +++++
.../ConfigFileCredentialsProviderTest.cs | 17 +-
tests/MessageTest.cs | 112 +++++++++
tests/RpcClientTest.cs | 113 +++++++++
.../StaticNameServerResolverTest.cs | 64 ++---
25 files changed, 1447 insertions(+), 235 deletions(-)
diff --git a/README.md b/README.md
index 8191957..7c8935d 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@ dotnet build
#### Run Unit Tests
```sh
-dotnet test
+dotnet test -l "console;verbosity=detailed"
```
#### Run Examples
diff --git a/rocketmq-client-csharp/Broker.cs b/rocketmq-client-csharp/Broker.cs
index e909bf7..2f5f675 100644
--- a/rocketmq-client-csharp/Broker.cs
+++ b/rocketmq-client-csharp/Broker.cs
@@ -41,6 +41,15 @@ namespace org.apache.rocketmq {
get { return address; }
}
+ /**
+ * Context aware primary target URL.
+ */
+ public string targetUrl()
+ {
+ var addr = address.Addresses[0];
+ return string.Format("https://{0}:{1}", addr.Host, addr.Port);
+ }
+
public int CompareTo(Broker other) {
if (0 != name.CompareTo(other.name)) {
return name.CompareTo(other.name);
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
new file mode 100644
index 0000000..e3cb547
--- /dev/null
+++ b/rocketmq-client-csharp/Client.cs
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Threading.Tasks;
+using System.Threading;
+using System;
+using rmq = apache.rocketmq.v1;
+using grpc = global::Grpc.Core;
+
+
+namespace org.apache.rocketmq
+{
+ public abstract class Client : ClientConfig, IClient
+ {
+
+ public Client(INameServerResolver resolver)
+ {
+ this.nameServerResolver = resolver;
+ this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace());
+ this.nameServerResolverCTS = new CancellationTokenSource();
+
+ this.topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
+ this.updateTopicRouteCTS = new CancellationTokenSource();
+ }
+
+ public virtual void start()
+ {
+ schedule(async () =>
+ {
+ await updateNameServerList();
+ }, 30, nameServerResolverCTS.Token);
+
+ schedule(async () =>
+ {
+ await updateTopicRoute();
+
+ }, 30, updateTopicRouteCTS.Token);
+
+ }
+
+ public virtual void shutdown()
+ {
+ updateTopicRouteCTS.Cancel();
+ nameServerResolverCTS.Cancel();
+ }
+
+ private async Task updateNameServerList()
+ {
+ List<string> nameServers = await nameServerResolver.resolveAsync();
+ if (0 == nameServers.Count)
+ {
+ // Whoops, something should be wrong. We got an empty name server list.
+ return;
+ }
+
+ if (nameServers.Equals(this.nameServers))
+ {
+ return;
+ }
+
+ // Name server list is updated.
+ // TODO: Locking is required
+ this.nameServers = nameServers;
+ this.currentNameServerIndex = 0;
+ }
+
+ private async Task updateTopicRoute()
+ {
+ if (null == nameServers || 0 == nameServers.Count)
+ {
+ List<string> list = await nameServerResolver.resolveAsync();
+ if (null != list && 0 != list.Count)
+ {
+ this.nameServers = list;
+ }
+ else
+ {
+ // TODO: log warning here.
+ return;
+ }
+ }
+
+ // We got one or more name servers available.
+ string nameServer = nameServers[currentNameServerIndex];
+
+ List<Task<TopicRouteData>> tasks = new List<Task<TopicRouteData>>();
+ foreach (var item in topicRouteTable)
+ {
+ tasks.Add(getRouteFor(item.Key, true));
+ }
+
+ // Update topic route data
+ TopicRouteData[] result = await Task.WhenAll(tasks);
+ foreach (var item in result)
+ {
+ if (null == item)
+ {
+ continue;
+ }
+
+ if (0 == item.Partitions.Count)
+ {
+ continue;
+ }
+
+ var topicName = item.Partitions[0].Topic.Name;
+ var existing = topicRouteTable[topicName];
+ if (!existing.Equals(item))
+ {
+ topicRouteTable[topicName] = item;
+ }
+ }
+ }
+
+ public void schedule(Action action, int seconds, CancellationToken token)
+ {
+ if (null == action)
+ {
+ // TODO: log warning
+ return;
+ }
+
+ Task.Run(async () =>
+ {
+ while (!token.IsCancellationRequested)
+ {
+ action();
+ await Task.Delay(TimeSpan.FromSeconds(seconds), token);
+ }
+ });
+ }
+
+ /**
+ * Parameters:
+ * topic
+ * Topic to query
+ * direct
+ * Indicate if we should by-pass cache and fetch route entries from name server.
+ */
+ public async Task<TopicRouteData> getRouteFor(string topic, bool direct)
+ {
+ if (!direct && topicRouteTable.ContainsKey(topic))
+ {
+ return topicRouteTable[topic];
+ }
+
+ if (null == nameServers || 0 == nameServers.Count)
+ {
+ List<string> list = await nameServerResolver.resolveAsync();
+ if (null != list && 0 != list.Count)
+ {
+ this.nameServers = list;
+ }
+ else
+ {
+ // TODO: log warning here.
+ return null;
+ }
+ }
+
+ // We got one or more name servers available.
+ string nameServer = nameServers[currentNameServerIndex];
+ var metadata = new grpc.Metadata();
+ Signature.sign(this, metadata);
+ var request = new rmq::QueryRouteRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = resourceNamespace();
+ request.Topic.Name = topic;
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ string[] segments = nameServer.Split(":");
+ address.Host = segments[0];
+ address.Port = Int32.Parse(segments[1]);
+ request.Endpoints.Addresses.Add(address);
+ var target = string.Format("https://{0}:{1}", segments[0], segments[1]);
+ var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
+ return topicRouteData;
+ }
+
+ public abstract void prepareHeartbeatData(rmq::HeartbeatRequest request);
+
+ public void heartbeat()
+ {
+ List<string> endpoints = endpointsInUse();
+ if (0 == endpoints.Count)
+ {
+ return;
+ }
+
+ var heartbeatRequest = new rmq::HeartbeatRequest();
+ prepareHeartbeatData(heartbeatRequest);
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ }
+
+ public void healthCheck()
+ {
+
+ }
+
+ public async Task<bool> notifyClientTermination()
+ {
+ List<string> endpoints = endpointsInUse();
+ var request = new rmq::NotifyClientTerminationRequest();
+ request.ClientId = clientId();
+
+ var metadata = new grpc.Metadata();
+ Signature.sign(this, metadata);
+
+ List<Task<Boolean>> tasks = new List<Task<Boolean>>();
+
+ foreach (var endpoint in endpoints)
+ {
+ tasks.Add(clientManager.notifyClientTermination(endpoint, metadata, request, getIoTimeout()));
+ }
+
+ bool[] results = await Task.WhenAll(tasks);
+ foreach (bool b in results)
+ {
+ if (!b)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private List<string> endpointsInUse()
+ {
+ //TODO: gather endpoints from route entries.
+ return new List<string>();
+ }
+
+ protected IClientManager clientManager;
+ private INameServerResolver nameServerResolver;
+ private CancellationTokenSource nameServerResolverCTS;
+ private List<string> nameServers;
+ private int currentNameServerIndex;
+
+ private ConcurrentDictionary<string, TopicRouteData> topicRouteTable;
+ private CancellationTokenSource updateTopicRouteCTS;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
new file mode 100644
index 0000000..59ec0f2
--- /dev/null
+++ b/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Interceptors;
+
+namespace org.apache.rocketmq
+{
+ public class ClientLoggerInterceptor : Interceptor
+ {
+ public override TResponse BlockingUnaryCall<TRequest, TResponse>(
+ TRequest request,
+ ClientInterceptorContext<TRequest, TResponse> context,
+ BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
+ {
+ LogCall(context.Method);
+ AddCallerMetadata(ref context);
+
+ return continuation(request, context);
+ }
+
+ public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
+ TRequest request,
+ ClientInterceptorContext<TRequest, TResponse> context,
+ AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
+ {
+ LogCall(context.Method);
+ AddCallerMetadata(ref context);
+
+ var call = continuation(request, context);
+
+ return new AsyncUnaryCall<TResponse>(HandleResponse(call.ResponseAsync), call.ResponseHeadersAsync, call.GetStatus, call.GetTrailers, call.Dispose);
+ }
+
+ private async Task<TResponse> HandleResponse<TResponse>(Task<TResponse> t)
+ {
+ try
+ {
+ var response = await t;
+ Console.WriteLine($"Response received: {response}");
+ return response;
+ }
+ catch (Exception ex)
+ {
+ // Log error to the console.
+ // Note: Configuring .NET Core logging is the recommended way to log errors
+ // https://docs.microsoft.com/aspnet/core/grpc/diagnostics#grpc-client-logging
+ var initialColor = Console.ForegroundColor;
+ Console.ForegroundColor = ConsoleColor.Red;
+ Console.WriteLine($"Call error: {ex.Message}");
+ Console.ForegroundColor = initialColor;
+
+ throw;
+ }
+ }
+
+ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
+ ClientInterceptorContext<TRequest, TResponse> context,
+ AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
+ {
+ LogCall(context.Method);
+ AddCallerMetadata(ref context);
+
+ return continuation(context);
+ }
+
+ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
+ TRequest request,
+ ClientInterceptorContext<TRequest, TResponse> context,
+ AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
+ {
+ LogCall(context.Method);
+ AddCallerMetadata(ref context);
+
+ return continuation(request, context);
+ }
+
+ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
+ ClientInterceptorContext<TRequest, TResponse> context,
+ AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
+ {
+ LogCall(context.Method);
+ AddCallerMetadata(ref context);
+
+ return continuation(context);
+ }
+
+ private void LogCall<TRequest, TResponse>(Method<TRequest, TResponse> method)
+ where TRequest : class
+ where TResponse : class
+ {
+ var initialColor = Console.ForegroundColor;
+ Console.ForegroundColor = ConsoleColor.Green;
+ Console.WriteLine($"Starting call. Type: {method.Type}. Request: {typeof(TRequest)}. Response: {typeof(TResponse)}");
+ Console.ForegroundColor = initialColor;
+ }
+
+ private void AddCallerMetadata<TRequest, TResponse>(ref ClientInterceptorContext<TRequest, TResponse> context)
+ where TRequest : class
+ where TResponse : class
+ {
+ var headers = context.Options.Headers;
+
+ // Call doesn't have a headers collection to add to.
+ // Need to create a new context with headers for the call.
+ if (headers == null)
+ {
+ headers = new Metadata();
+ var options = context.Options.WithHeaders(headers);
+ context = new ClientInterceptorContext<TRequest, TResponse>(context.Method, context.Host, options);
+ }
+
+ // Add caller metadata to call headers
+ headers.Add("caller-user", Environment.UserName);
+ headers.Add("caller-machine", Environment.MachineName);
+ headers.Add("caller-os", Environment.OSVersion.ToString());
+ }
+ }
+}
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 4dd8790..59fec83 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -20,9 +20,12 @@ using System.Collections.Concurrent;
using rmq = global::apache.rocketmq.v1;
using Grpc.Net.Client;
using System;
+using System.Threading;
using System.Threading.Tasks;
using grpc = global::Grpc.Core;
using System.Collections.Generic;
+using Grpc.Core.Interceptors;
+using System.Net.Http;
namespace org.apache.rocketmq {
public class ClientManager : IClientManager {
@@ -33,8 +36,11 @@ namespace org.apache.rocketmq {
public IRpcClient getRpcClient(string target) {
if (!rpcClients.ContainsKey(target)) {
- using var channel = GrpcChannel.ForAddress(target);
- var client = new rmq.MessagingService.MessagingServiceClient(channel);
+ var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions {
+ HttpHandler = createHttpHandler()
+ });
+ var invoker = channel.Intercept(new ClientLoggerInterceptor());
+ var client = new rmq::MessagingService.MessagingServiceClient(invoker);
var rpcClient = new RpcClient(client);
if(rpcClients.TryAdd(target, rpcClient)) {
return rpcClient;
@@ -43,11 +49,32 @@ namespace org.apache.rocketmq {
return rpcClients[target];
}
- public async Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, rmq.QueryRouteRequest request, TimeSpan timeout) {
+ /**
+ * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
+ * why parameters are configured this way.
+ */
+ public static HttpMessageHandler createHttpHandler()
+ {
+ var sslOptions = new System.Net.Security.SslClientAuthenticationOptions();
+ // Disable server certificate validation during development phase.
+ // Comment out the following line if server certificate validation is required.
+ sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
+ var handler = new SocketsHttpHandler
+ {
+ PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
+ KeepAlivePingDelay = TimeSpan.FromSeconds(60),
+ KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
+ EnableMultipleHttp2Connections = true,
+ SslOptions = sslOptions,
+ };
+ return handler;
+ }
+
+ public async Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
+ {
var rpcClient = getRpcClient(target);
- var callOptions = new grpc::CallOptions();
- callOptions.WithDeadline(DateTime.Now.Add(timeout))
- .WithHeaders(metadata);
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new grpc::CallOptions(metadata, deadline);
var queryRouteResponse = await rpcClient.queryRoute(request, callOptions);
if (queryRouteResponse.Common.Status.Code != ((int)Google.Rpc.Code.Ok)) {
@@ -63,22 +90,22 @@ namespace org.apache.rocketmq {
var id = partition.Id;
Permission permission = Permission.READ_WRITE;
switch (partition.Permission) {
- case rmq.Permission.None:
+ case rmq::Permission.None:
{
permission = Permission.NONE;
break;
}
- case rmq.Permission.Read:
+ case rmq::Permission.Read:
{
permission = Permission.READ;
break;
}
- case rmq.Permission.Write:
+ case rmq::Permission.Write:
{
permission = Permission.WRITE;
break;
}
- case rmq.Permission.ReadWrite:
+ case rmq::Permission.ReadWrite:
{
permission = Permission.READ_WRITE;
break;
@@ -87,15 +114,18 @@ namespace org.apache.rocketmq {
AddressScheme scheme = AddressScheme.IPv4;
switch(partition.Broker.Endpoints.Scheme) {
- case rmq.AddressScheme.Ipv4: {
+ case rmq::AddressScheme.Ipv4:
+ {
scheme = AddressScheme.IPv4;
break;
}
- case rmq.AddressScheme.Ipv6: {
+ case rmq::AddressScheme.Ipv6:
+ {
scheme = AddressScheme.IPv6;
break;
}
- case rmq.AddressScheme.DomainName: {
+ case rmq::AddressScheme.DomainName:
+ {
scheme = AddressScheme.DOMAIN_NAME;
break;
}
@@ -114,6 +144,38 @@ namespace org.apache.rocketmq {
return topicRouteData;
}
+ public async Task<Boolean> heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout)
+ {
+ var rpcClient = getRpcClient(target);
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new grpc.CallOptions(metadata, deadline);
+ var response = await rpcClient.heartbeat(request, callOptions);
+ if (null == response)
+ {
+ return false;
+ }
+
+ return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+ }
+
+ public async Task<rmq::SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, rmq::SendMessageRequest request, TimeSpan timeout)
+ {
+ var rpcClient = getRpcClient(target);
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new grpc::CallOptions(metadata, deadline);
+ var response = await rpcClient.sendMessage(request, callOptions);
+ return response;
+ }
+
+ public async Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+ {
+ var rpcClient = getRpcClient(target);
+ var deadline = DateTime.UtcNow.Add(timeout);
+ var callOptions = new grpc::CallOptions(metadata, deadline);
+ rmq::NotifyClientTerminationResponse response = await rpcClient.notifyClientTermination(request, callOptions);
+ return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+ }
+
private ConcurrentDictionary<string, RpcClient> rpcClients;
}
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/ClientManagerFactory.cs
similarity index 50%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/ClientManagerFactory.cs
index d4102b8..3ca211d 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/ClientManagerFactory.cs
@@ -1,29 +1,45 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
- public interface IRpcClient
- {
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace org.apache.rocketmq
+{
+ public sealed class ClientManagerFactory
+ {
+ public static IClientManager getClientManager(string resourceNamespace)
+ {
+ if (clientManagers.ContainsKey(resourceNamespace))
+ {
+ return clientManagers[resourceNamespace];
+ }
+
+ var clientManager = new ClientManager();
+ // TODO: configure client managers.
+ if (clientManagers.TryAdd<string, IClientManager>(resourceNamespace, clientManager))
+ {
+ return clientManager;
+ }
+
+ return clientManagers[resourceNamespace];
+ }
+
+ private static ConcurrentDictionary<string, IClientManager> clientManagers = new ConcurrentDictionary<string, IClientManager>();
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
new file mode 100644
index 0000000..1381b3f
--- /dev/null
+++ b/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+using System;
+using System.Text.Json;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq {
+
+ /**
+ * File-based credentials provider that reads JSON configurations from ${HOME}/.rocketmq/config
+ * A sample config content is as follows:
+ * {"AccessKey": "key", "AccessSecret": "secret"}
+ */
+ public class ConfigFileCredentialsProvider : ICredentialsProvider {
+
+ public ConfigFileCredentialsProvider() {
+ var home = Environment.GetFolderPath(Environment.SpecialFolder.UserProfile);
+ string configFileRelativePath = "/.rocketmq/config";
+ if (!File.Exists(home + configFileRelativePath)) {
+ return;
+ }
+
+ try {
+ using (var reader = new StreamReader(home + configFileRelativePath)) {
+ string json = reader.ReadToEnd();
+ var kv = JsonSerializer.Deserialize<Dictionary<string, string>>(json);
+ accessKey = kv["AccessKey"];
+ accessSecret = kv["AccessSecret"];
+ valid = true;
+ }
+ } catch (IOException e) {
+ }
+ }
+
+ public Credentials getCredentials() {
+ if (!valid) {
+ return null;
+ }
+
+ return new Credentials(accessKey, accessSecret);
+ }
+
+ private string accessKey;
+ private string accessSecret;
+
+ private bool valid = false;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IClient.cs
similarity index 78%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/IClient.cs
index d4102b8..7f3ed64 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -1,29 +1,32 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
- public interface IRpcClient
- {
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq
+{
+ public interface IClient : IClientConfig
+ {
+
+ void heartbeat();
+
+ void healthCheck();
+
+ Task<bool> notifyClientTermination();
+
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index ea8ba55..08ed86a 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -26,5 +26,11 @@ namespace org.apache.rocketmq {
Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
+ Task<Boolean> heartbeat(string target, grpc::Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+
+ Task<Boolean> notifyClientTermination(string target, grpc::Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout);
+
+ Task<SendMessageResponse> sendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/INameServerResolver.cs
similarity index 79%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/INameServerResolver.cs
index d4102b8..568098f 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/INameServerResolver.cs
@@ -1,29 +1,27 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
- public interface IRpcClient
- {
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq
+{
+ public interface INameServerResolver
+ {
+ Task<List<string>> resolveAsync();
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IProducer.cs
similarity index 76%
copy from rocketmq-client-csharp/IClientManager.cs
copy to rocketmq-client-csharp/IProducer.cs
index ea8ba55..89f8955 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-using apache.rocketmq.v1;
-using System.Threading.Tasks;
using System;
-using grpc = global::Grpc.Core;
+using System.Threading.Tasks;
namespace org.apache.rocketmq {
- public interface IClientManager {
- IRpcClient getRpcClient(string target);
+ public interface IProducer {
+ void start();
- Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
+ void shutdown();
+ Task<SendResult> send(Message message);
+
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index d4102b8..0590bb0 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -23,7 +23,12 @@ namespace org.apache.rocketmq
{
public interface IRpcClient
{
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
+ Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
+ Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions);
+
+ Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions);
+
+ Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions);
}
}
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
new file mode 100644
index 0000000..282e8aa
--- /dev/null
+++ b/rocketmq-client-csharp/Message.cs
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Collections.Generic;
+namespace org.apache.rocketmq
+{
+
+ public class Message {
+ public Message() : this(null, null) {
+ }
+
+ public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) {}
+
+ public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body) {
+ }
+
+ public Message(string topic, string tag, List<string> keys, byte[] body) {
+ this.maxAttemptTimes = 3;
+ this.topic = topic;
+ this.tag = tag;
+ this.keys = keys;
+ this.body = body;
+ this.userProperties = new Dictionary<string, string>();
+ this.systemProperties = new Dictionary<string, string>();
+ }
+
+ private string topic;
+
+ public string Topic {
+ get { return topic; }
+ set { this.topic = value; }
+ }
+
+ private byte[] body;
+ public byte[] Body {
+ get { return body; }
+ set { this.body = value; }
+ }
+
+ private string tag;
+ public string Tag {
+ get { return tag; }
+ set { this.tag = value; }
+ }
+
+ private List<string> keys;
+ public List<string> Keys{
+ get { return keys; }
+ set { this.keys = value; }
+ }
+
+ private Dictionary<string, string> userProperties;
+ public Dictionary<string, string> UserProperties {
+ get { return userProperties; }
+ set { this.userProperties = value; }
+ }
+
+ private Dictionary<string, string> systemProperties;
+ internal Dictionary<string, string> SystemProperties {
+ get { return systemProperties; }
+ set { this.systemProperties = value; }
+ }
+
+ private int maxAttemptTimes;
+ public int MaxAttemptTimes
+ {
+ get { return maxAttemptTimes; }
+ set { maxAttemptTimes = value; }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/MessageType.cs
similarity index 76%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/MessageType.cs
index d4102b8..376b658 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/MessageType.cs
@@ -1,29 +1,28 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
- public interface IRpcClient
- {
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace org.apache.rocketmq
+{
+ public enum MessageType {
+ Normal,
+ Fifo,
+ Delay,
+ Transaction,
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
new file mode 100644
index 0000000..ed837bc
--- /dev/null
+++ b/rocketmq-client-csharp/Producer.cs
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading.Tasks;
+using rmq = apache.rocketmq.v1;
+using pb = global::Google.Protobuf;
+using grpc = global::Grpc.Core;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+
+namespace org.apache.rocketmq
+{
+ public class Producer : Client, IProducer
+ {
+ public Producer(INameServerResolver resolver) : base(resolver)
+ {
+ this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+ }
+
+ public override void start()
+ {
+ base.start();
+ // More initalization
+ }
+
+ public override void shutdown()
+ {
+ // Release local resources
+ base.shutdown();
+ }
+
+ public override void prepareHeartbeatData(rmq::HeartbeatRequest request)
+ {
+
+ }
+
+ public async Task<SendResult> send(Message message)
+ {
+ if (!loadBalancer.ContainsKey(message.Topic))
+ {
+ var topicRouteData = await getRouteFor(message.Topic, false);
+ if (null == topicRouteData || null == topicRouteData.Partitions || 0 == topicRouteData.Partitions.Count)
+ {
+ throw new TopicRouteException(string.Format("No topic route for {0}", message.Topic));
+ }
+
+ var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
+ loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+ }
+
+ var publishLB = loadBalancer[message.Topic];
+
+ var request = new rmq::SendMessageRequest();
+ request.Message = new rmq::Message();
+ request.Message.Body = pb::ByteString.CopyFrom(message.Body);
+ request.Message.Topic = new rmq::Resource();
+ request.Message.Topic.ResourceNamespace = resourceNamespace();
+ request.Message.Topic.Name = message.Topic;
+
+ // User properties
+ foreach (var item in message.UserProperties)
+ {
+ request.Message.UserAttribute.Add(item.Key, item.Value);
+ }
+
+ request.Message.SystemAttribute = new rmq::SystemAttribute();
+ if (!string.IsNullOrEmpty(message.Tag))
+ {
+ request.Message.SystemAttribute.Tag = message.Tag;
+ }
+
+ if (0 != message.Keys.Count)
+ {
+ foreach (var key in message.Keys)
+ {
+ request.Message.SystemAttribute.Keys.Add(key);
+ }
+ }
+
+ // string target = "https://";
+ List<string> targets = new List<string>();
+ List<Partition> candidates = publishLB.select(message.MaxAttemptTimes);
+ foreach (var partition in candidates)
+ {
+ targets.Add(partition.Broker.targetUrl());
+ }
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+
+ Exception ex = null;
+
+ foreach (var target in targets)
+ {
+ try
+ {
+ rmq::SendMessageResponse response = await clientManager.sendMessage(target, metadata, request, getIoTimeout());
+ if (null != response && (int)global::Google.Rpc.Code.Ok == response.Common.Status.Code)
+ {
+ var messageId = response.MessageId;
+ return new SendResult(messageId);
+ }
+ }
+ catch (Exception e)
+ {
+ ex = e;
+ }
+ }
+
+ if (null != ex)
+ {
+ throw ex;
+ }
+
+ throw new Exception("Send message failed");
+ }
+
+ private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PublishLoadBalancer.cs b/rocketmq-client-csharp/PublishLoadBalancer.cs
new file mode 100644
index 0000000..9a1b66d
--- /dev/null
+++ b/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq
+{
+ public class PublishLoadBalancer
+ {
+ public PublishLoadBalancer(TopicRouteData route)
+ {
+ this.partitions = new List<Partition>();
+ foreach (var partition in route.Partitions)
+ {
+ if (Permission.NONE == partition.Permission)
+ {
+ continue;
+ }
+
+ if (Permission.READ == partition.Permission)
+ {
+ continue;
+ }
+
+ this.partitions.Add(partition);
+ }
+
+ this.partitions.Sort();
+ Random random = new Random();
+ this.roundRobinIndex = random.Next(0, this.partitions.Count);
+ }
+
+ public void update(TopicRouteData route)
+ {
+ List<Partition> partitions = new List<Partition>();
+ foreach (var partition in route.Partitions)
+ {
+ if (Permission.NONE == partition.Permission)
+ {
+ continue;
+ }
+
+ if (Permission.READ == partition.Permission)
+ {
+ continue;
+ }
+ partitions.Add(partition);
+ }
+ partitions.Sort();
+ this.partitions = partitions;
+ }
+
+ /**
+ * Accept a partition iff its broker is different.
+ */
+ private bool accept(List<Partition> existing, Partition partition)
+ {
+ if (0 == existing.Count)
+ {
+ return true;
+ }
+
+ foreach (var item in existing)
+ {
+ if (item.Broker.Equals(partition.Broker))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public List<Partition> select(int maxAttemptTimes)
+ {
+ List<Partition> result = new List<Partition>();
+
+ List<Partition> all = this.partitions;
+ if (0 == all.Count)
+ {
+ return result;
+ }
+ int start = ++roundRobinIndex;
+ int found = 0;
+
+ for (int i = 0; i < all.Count; i++)
+ {
+ int idx = ((start + i) & int.MaxValue) % all.Count;
+ if (accept(result, all[idx]))
+ {
+ result.Add(all[idx]);
+ if (++found >= maxAttemptTimes)
+ {
+ break;
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private List<Partition> partitions;
+
+ private int roundRobinIndex;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 0cc8354..0191e91 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -25,7 +25,8 @@ namespace org.apache.rocketmq {
stub = client;
}
- public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions) {
+ public async Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions)
+ {
var call = stub.QueryRouteAsync(request, callOptions);
var response = await call.ResponseAsync;
var status = call.GetStatus();
@@ -35,6 +36,26 @@ namespace org.apache.rocketmq {
return response;
}
+ public async Task<HeartbeatResponse> heartbeat(HeartbeatRequest request, grpc::CallOptions callOptions)
+ {
+ var call = stub.HeartbeatAsync(request, callOptions);
+ var response = await call.ResponseAsync;
+ return response;
+ }
+
+ public async Task<NotifyClientTerminationResponse> notifyClientTermination(NotifyClientTerminationRequest request, grpc::CallOptions callOptions)
+ {
+ var call = stub.NotifyClientTerminationAsync(request, callOptions);
+ var response = await call.ResponseAsync;
+ return response;
+ }
+
+ public async Task<SendMessageResponse> sendMessage(SendMessageRequest request, grpc::CallOptions callOptions)
+ {
+ var call = stub.SendMessageAsync(request, callOptions);
+ return await call.ResponseAsync;
+ }
+
private MessagingService.MessagingServiceClient stub;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Signature.cs b/rocketmq-client-csharp/Signature.cs
index 4f1fd93..70e038a 100644
--- a/rocketmq-client-csharp/Signature.cs
+++ b/rocketmq-client-csharp/Signature.cs
@@ -22,7 +22,7 @@ using System.Security.Cryptography;
namespace org.apache.rocketmq {
public class Signature {
public static void sign(IClientConfig clientConfig, grpc::Metadata metadata) {
- metadata.Add(MetadataConstants.LANGUAGE_KEY, "C#");
+ metadata.Add(MetadataConstants.LANGUAGE_KEY, "DOTNET");
metadata.Add(MetadataConstants.CLIENT_VERSION_KEY, "5.0.0");
if (!String.IsNullOrEmpty(clientConfig.tenantId())) {
metadata.Add(MetadataConstants.TENANT_ID_KEY, clientConfig.tenantId());
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/StaticNameServerResolver.cs
similarity index 69%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/StaticNameServerResolver.cs
index d4102b8..9f97599 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/StaticNameServerResolver.cs
@@ -1,29 +1,38 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
- public interface IRpcClient
- {
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq
+{
+ public class StaticNameServerResolver : INameServerResolver
+ {
+
+ public StaticNameServerResolver(List<string> nameServerList)
+ {
+ this.nameServerList = nameServerList;
+ }
+
+ public async Task<List<string>> resolveAsync()
+ {
+ return nameServerList;
+ }
+
+ private List<string> nameServerList;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/TopicRouteException.cs
similarity index 76%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to rocketmq-client-csharp/TopicRouteException.cs
index d4102b8..b520e72 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/TopicRouteException.cs
@@ -1,29 +1,28 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
- public interface IRpcClient
- {
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+namespace org.apache.rocketmq
+{
+ public class TopicRouteException : Exception
+ {
+ public TopicRouteException(string message) : base(message)
+ {
+
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/tests/ClientManagerTest.cs b/tests/ClientManagerTest.cs
new file mode 100644
index 0000000..0f8bff7
--- /dev/null
+++ b/tests/ClientManagerTest.cs
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using rmq = apache.rocketmq.v1;
+using grpc = global::Grpc.Core;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace org.apache.rocketmq {
+
+ [TestClass]
+ public class ClientManagerTest {
+
+ [TestMethod]
+ public void testResolveRoute() {
+ string topic = "cpp_sdk_standard";
+ string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+ var request = new rmq::QueryRouteRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = resourceNamespace;
+ request.Topic.Name = topic;
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Host = "116.62.231.199";
+ address.Port = 80;
+ request.Endpoints.Addresses.Add(address);
+
+ var metadata = new grpc::Metadata();
+ var clientConfig = new ClientConfig();
+ var credentialsProvider = new ConfigFileCredentialsProvider();
+ clientConfig.CredentialsProvider = credentialsProvider;
+ clientConfig.ResourceNamespace = resourceNamespace;
+ clientConfig.Region = "cn-hangzhou-pre";
+ Signature.sign(clientConfig, metadata);
+ var clientManager = new ClientManager();
+ string target = "https://116.62.231.199:80";
+ var topicRouteData = clientManager.resolveRoute(target, metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+ Console.WriteLine(topicRouteData);
+ }
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClientManager.cs b/tests/ConfigFileCredentialsProviderTest.cs
similarity index 70%
copy from rocketmq-client-csharp/IClientManager.cs
copy to tests/ConfigFileCredentialsProviderTest.cs
index ea8ba55..f94d364 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/tests/ConfigFileCredentialsProviderTest.cs
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-using apache.rocketmq.v1;
-using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
-using grpc = global::Grpc.Core;
namespace org.apache.rocketmq {
- public interface IClientManager {
- IRpcClient getRpcClient(string target);
-
- Task<TopicRouteData> resolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
-
+ [TestClass]
+ public class ConfigFileCredentialsProviderTest {
+ [TestMethod]
+ public void testGetCredentials() {
+ var provider = new ConfigFileCredentialsProvider();
+ var credentials = provider.getCredentials();
+ Assert.IsNotNull(credentials);
+ }
}
}
\ No newline at end of file
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
new file mode 100644
index 0000000..eebdc46
--- /dev/null
+++ b/tests/MessageTest.cs
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.Text;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq {
+ [TestClass]
+ public class MessageTest {
+
+ [TestMethod]
+ public void testCtor() {
+ var msg1 = new Message();
+ Assert.IsNull(msg1.Topic);
+ Assert.IsNull(msg1.Body);
+ Assert.IsNull(msg1.Tag);
+ Assert.AreEqual(msg1.Keys.Count, 0);
+ Assert.AreEqual(msg1.UserProperties.Count, 0);
+ }
+
+ [TestMethod]
+ public void testCtor2() {
+ string topic = "T1";
+ string bodyString = "body";
+ byte[] body = Encoding.ASCII.GetBytes(bodyString);
+ var msg1 = new Message(topic, body);
+ Assert.AreEqual(msg1.Topic, topic);
+ Assert.AreEqual(msg1.Body, body);
+ Assert.IsNull(msg1.Tag);
+ Assert.AreEqual(msg1.Keys.Count, 0);
+ Assert.AreEqual(msg1.UserProperties.Count, 0);
+ }
+
+ [TestMethod]
+ public void testCtor3() {
+ string topic = "T1";
+ string bodyString = "body";
+ byte[] body = Encoding.ASCII.GetBytes(bodyString);
+ string tag = "TagA";
+ var msg1 = new Message(topic, tag, body);
+ Assert.AreEqual(msg1.Topic, topic);
+ Assert.AreEqual(msg1.Body, body);
+ Assert.AreEqual(msg1.Tag, tag);
+ Assert.AreEqual(msg1.Keys.Count, 0);
+ Assert.AreEqual(msg1.UserProperties.Count, 0);
+ }
+
+ [TestMethod]
+ public void testCtor4() {
+ string topic = "T1";
+ string bodyString = "body";
+ byte[] body = Encoding.ASCII.GetBytes(bodyString);
+ string tag = "TagA";
+ List<string> keys = new List<string>();
+ keys.Add("Key1");
+ keys.Add("Key2");
+
+ var msg1 = new Message(topic, tag, keys, body);
+ Assert.AreEqual(msg1.Topic, topic);
+ Assert.AreEqual(msg1.Body, body);
+ Assert.AreEqual(msg1.Tag, tag);
+ Assert.AreEqual(msg1.Keys, keys);
+ Assert.AreEqual(msg1.UserProperties.Count, 0);
+ }
+
+ [TestMethod]
+ public void testCtor5() {
+ string topic = "T1";
+ string bodyString = "body";
+ byte[] body = Encoding.ASCII.GetBytes(bodyString);
+ string tag = "TagA";
+ List<string> keys = new List<string>();
+ keys.Add("Key1");
+ keys.Add("Key2");
+
+ var msg1 = new Message(topic, tag, keys, body);
+
+ msg1.UserProperties.Add("k", "v");
+ msg1.UserProperties.Add("k2", "v2");
+
+ Assert.AreEqual(msg1.Topic, topic);
+ Assert.AreEqual(msg1.Body, body);
+ Assert.AreEqual(msg1.Tag, tag);
+ Assert.AreEqual(msg1.Keys, keys);
+ Assert.AreEqual(msg1.UserProperties.Count, 2);
+
+ string value;
+ msg1.UserProperties.TryGetValue("k", out value);
+ Assert.AreEqual(value, "v");
+
+ msg1.UserProperties.TryGetValue("k2", out value);
+ Assert.AreEqual(value, "v2");
+
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
new file mode 100644
index 0000000..94a43c2
--- /dev/null
+++ b/tests/RpcClientTest.cs
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Grpc.Core.Interceptors;
+using System.Net.Http;
+using Grpc.Net.Client;
+using rmq = global::apache.rocketmq.v1;
+using grpc = global::Grpc.Core;
+using System;
+
+namespace org.apache.rocketmq
+{
+ [TestClass]
+ public class RpcClientTest
+ {
+
+
+ [ClassInitialize]
+ public static void SetUp(TestContext context)
+ {
+ string target = string.Format("https://{0}:{1}", host, port);
+ var channel = GrpcChannel.ForAddress(target, new GrpcChannelOptions
+ {
+ HttpHandler = ClientManager.createHttpHandler()
+ });
+ var invoker = channel.Intercept(new ClientLoggerInterceptor());
+ var client = new rmq::MessagingService.MessagingServiceClient(invoker);
+ rpcClient = new RpcClient(client);
+
+ clientConfig = new ClientConfig();
+ var credentialsProvider = new ConfigFileCredentialsProvider();
+ clientConfig.CredentialsProvider = credentialsProvider;
+ clientConfig.ResourceNamespace = resourceNamespace;
+ clientConfig.Region = "cn-hangzhou-pre";
+ }
+
+ [ClassCleanup]
+ public static void TearDown()
+ {
+
+ }
+
+ [TestMethod]
+ public void testQueryRoute()
+ {
+ var request = new rmq::QueryRouteRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = resourceNamespace;
+ request.Topic.Name = topic;
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Host = host;
+ address.Port = port;
+ request.Endpoints.Addresses.Add(address);
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+
+ var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+ var callOptions = new grpc::CallOptions(metadata, deadline);
+ var response = rpcClient.queryRoute(request, callOptions).GetAwaiter().GetResult();
+ }
+
+
+ [TestMethod]
+ public void testHeartbeat()
+ {
+
+ var request = new rmq::HeartbeatRequest();
+ request.ClientId = clientId;
+ request.ProducerData = new rmq::ProducerData();
+ request.ProducerData.Group = new rmq::Resource();
+ request.ProducerData.Group.ResourceNamespace = resourceNamespace;
+ request.ProducerData.Group.Name = topic;
+ request.FifoFlag = false;
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+
+ var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+ var callOptions = new grpc::CallOptions(metadata, deadline);
+ var response = rpcClient.heartbeat(request, callOptions).GetAwaiter().GetResult();
+ }
+
+ private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+
+ private static string topic = "cpp_sdk_standard";
+
+ private static string clientId = "C001";
+ private static string group = "GID_cpp_sdk_standard";
+
+ private static string host = "116.62.231.199";
+ private static int port = 80;
+
+ private static IRpcClient rpcClient;
+ private static ClientConfig clientConfig;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/tests/StaticNameServerResolverTest.cs
similarity index 62%
copy from rocketmq-client-csharp/IRpcClient.cs
copy to tests/StaticNameServerResolverTest.cs
index d4102b8..88955e9 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/tests/StaticNameServerResolverTest.cs
@@ -1,29 +1,35 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading.Tasks;
-using apache.rocketmq.v1;
-using grpc = global::Grpc.Core;
-
-namespace org.apache.rocketmq
-{
- public interface IRpcClient
- {
- Task<QueryRouteResponse> queryRoute(QueryRouteRequest request, grpc::CallOptions callOptions);
-
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq
+{
+ [TestClass]
+ public class StaticNameServerResolverTest
+ {
+ [TestMethod]
+ public void testResolve()
+ {
+ List<string> list = new List<string>();
+ list.Add("https://localhost:80");
+ var resolver = new StaticNameServerResolver(list);
+ var result = resolver.resolveAsync().GetAwaiter().GetResult();
+ Assert.AreSame(list, result);
+ }
+ }
+}
\ No newline at end of file