You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/03/04 09:33:17 UTC
[rocketmq-client-csharp] branch develop updated: Implement the first alpha version of PushConsumer, with many TODOs (#14)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new 684a558 Implement the first alpha version of PushConsumer, with many TODOs (#14)
684a558 is described below
commit 684a5580aff3ea0f2b1eb43fa4b3036a9046acd5
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Fri Mar 4 17:33:13 2022 +0800
Implement the first alpha version of PushConsumer, with many TODOs (#14)
Implement an initial version of PushConsumer
---
rocketmq-client-csharp/Client.cs | 256 +++++++++++++++---
rocketmq-client-csharp/ClientConfig.cs | 2 +-
rocketmq-client-csharp/ClientManager.cs | 138 +++++++++-
.../{IClient.cs => ExpressionType.cs} | 16 +-
.../{IClient.cs => FilterExpression.cs} | 25 +-
rocketmq-client-csharp/IClient.cs | 5 +-
rocketmq-client-csharp/IClientManager.cs | 16 +-
.../{IProducer.cs => IConsumer.cs} | 13 +-
.../{IClient.cs => IMessageListener.cs} | 12 +-
rocketmq-client-csharp/IProducer.cs | 2 +-
rocketmq-client-csharp/Message.cs | 13 +-
.../{IClient.cs => ProcessQueue.cs} | 25 +-
rocketmq-client-csharp/Producer.cs | 4 +-
rocketmq-client-csharp/PushConsumer.cs | 291 +++++++++++++++++++++
.../rocketmq-client-csharp.csproj | 1 +
tests/ProducerTest.cs | 3 +-
tests/{ProducerTest.cs => PushConsumerTest.cs} | 64 ++++-
tests/RpcClientTest.cs | 221 +++++++++++++++-
tests/UnitTest1.cs | 24 ++
19 files changed, 1027 insertions(+), 104 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index e628d65..04a4ed3 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -20,7 +20,7 @@ using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
using System;
-using Apache.Rocketmq.V1;
+using rmq = Apache.Rocketmq.V1;
using grpc = global::Grpc.Core;
using NLog;
@@ -29,7 +29,7 @@ namespace Org.Apache.Rocketmq
{
public abstract class Client : ClientConfig, IClient
{
- private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+ protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
public Client(INameServerResolver resolver, string resourceNamespace)
{
@@ -40,6 +40,8 @@ namespace Org.Apache.Rocketmq
_topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
_updateTopicRouteCts = new CancellationTokenSource();
+
+ _healthCheckCts = new CancellationTokenSource();
}
public virtual void Start()
@@ -55,14 +57,55 @@ namespace Org.Apache.Rocketmq
}, 30, _updateTopicRouteCts.Token);
+ schedule(async () =>
+ {
+ await HealthCheck();
+ }, 30, _healthCheckCts.Token);
+
}
- public virtual async Task Shutdown()
+ public virtual void Shutdown()
{
Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
_updateTopicRouteCts.Cancel();
_nameServerResolverCts.Cancel();
- await Manager.Shutdown();
+ Manager.Shutdown().GetAwaiter().GetResult();
+ }
+
+ protected string FilterBroker(Func<string, bool> acceptor)
+ {
+ foreach (var item in _topicRouteTable)
+ {
+ foreach (var partition in item.Value.Partitions)
+ {
+ string target = partition.Broker.targetUrl();
+ if (acceptor(target))
+ {
+ return target;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return all endpoints of brokers in route table.
+ */
+ private List<string> AvailableBrokerEndpoints()
+ {
+ List<string> endpoints = new List<string>();
+ foreach (var item in _topicRouteTable)
+ {
+ foreach (var partition in item.Value.Partitions)
+ {
+ string endpoint = partition.Broker.targetUrl();
+ if (!endpoints.Contains(endpoint))
+ {
+ endpoints.Add(endpoint);
+ }
+ }
+ }
+ return endpoints;
}
private async Task UpdateNameServerList()
@@ -153,6 +196,18 @@ namespace Org.Apache.Rocketmq
});
}
+ protected rmq::Endpoints AccessEndpoint(string nameServer)
+ {
+ var endpoints = new rmq::Endpoints();
+ endpoints.Scheme = global::Apache.Rocketmq.V1.AddressScheme.Ipv4;
+ var address = new global::Apache.Rocketmq.V1.Address();
+ int pos = nameServer.LastIndexOf(':');
+ address.Host = nameServer.Substring(0, pos);
+ address.Port = Int32.Parse(nameServer.Substring(pos + 1));
+ endpoints.Addresses.Add(address);
+ return endpoints;
+ }
+
/**
* Parameters:
* topic
@@ -187,59 +242,198 @@ namespace Org.Apache.Rocketmq
// We got one or more name servers available.
int index = (_currentNameServerIndex + retry) % _nameServers.Count;
string nameServer = _nameServers[index];
- var request = new QueryRouteRequest();
- request.Topic = new Resource();
+ var request = new rmq::QueryRouteRequest();
+ request.Topic = new rmq::Resource();
request.Topic.ResourceNamespace = _resourceNamespace;
request.Topic.Name = topic;
- request.Endpoints = new Endpoints();
- request.Endpoints.Scheme = global::Apache.Rocketmq.V1.AddressScheme.Ipv4;
- var address = new global::Apache.Rocketmq.V1.Address();
- int pos = nameServer.LastIndexOf(':');
- address.Host = nameServer.Substring(0, pos);
- address.Port = Int32.Parse(nameServer.Substring(pos + 1));
- request.Endpoints.Addresses.Add(address);
- var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+ request.Endpoints = AccessEndpoint(nameServer);
var metadata = new grpc.Metadata();
Signature.sign(this, metadata);
- var topicRouteData = await Manager.ResolveRoute(target, metadata, request, getIoTimeout());
- if (null != topicRouteData)
+ string target = $"https://{nameServer}";
+ TopicRouteData topicRouteData;
+ try
{
- if (retry > 0)
+ topicRouteData = await Manager.ResolveRoute(target, metadata, request, getIoTimeout());
+ if (null != topicRouteData)
{
- _currentNameServerIndex = index;
+ Logger.Debug($"Got route entries for {topic} from name server");
+ _topicRouteTable.TryAdd(topic, topicRouteData);
+
+ if (retry > 0)
+ {
+ _currentNameServerIndex = index;
+ }
+ return topicRouteData;
}
- return topicRouteData;
+ else
+ {
+ Logger.Warn($"Failed to query route of {topic} from {target}");
+ }
+ }
+ catch (System.Exception e)
+ {
+ Logger.Warn(e, "Failed when querying route");
}
}
return null;
}
- public abstract void PrepareHeartbeatData(HeartbeatRequest request);
+ public abstract void PrepareHeartbeatData(rmq::HeartbeatRequest request);
- public void Heartbeat()
+ public async Task Heartbeat()
{
- List<string> endpoints = endpointsInUse();
+ List<string> endpoints = AvailableBrokerEndpoints();
if (0 == endpoints.Count)
{
+ Logger.Debug("No broker endpoints available in topic route");
return;
}
- var heartbeatRequest = new HeartbeatRequest();
- PrepareHeartbeatData(heartbeatRequest);
+ var request = new rmq::HeartbeatRequest();
+ PrepareHeartbeatData(request);
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+
+ List<Task> tasks = new List<Task>();
+ foreach (var endpoint in endpoints)
+ {
+ tasks.Add(Manager.Heartbeat(endpoint, metadata, request, getIoTimeout()));
+ }
+
+ await Task.WhenAll(tasks);
+ }
+
+ private List<string> BlockedBrokerEndpoints()
+ {
+ List<string> endpoints = new List<string>();
+ return endpoints;
+ }
+
+ public async Task HealthCheck()
+ {
+ var request = new rmq::HealthCheckRequest();
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ List<Task<Boolean>> tasks = new List<Task<Boolean>>();
+ List<string> endpoints = BlockedBrokerEndpoints();
+ foreach (var endpoint in endpoints)
+ {
+ tasks.Add(Manager.HealthCheck(endpoint, metadata, request, getIoTimeout()));
+ }
+ var result = await Task.WhenAll(tasks);
+ int i = 0;
+ foreach (var ok in result)
+ {
+ if (ok)
+ {
+ RemoveFromBlockList(endpoints[i]);
+ }
+ ++i;
+ }
+ }
+
+ private void RemoveFromBlockList(string endpoint)
+ {
+
+ }
+
+ public async Task<List<rmq::Assignment>> scanLoadAssignment(string topic, string group)
+ {
+ // Pick a broker randomly
+ string target = FilterBroker((s) => true);
+ var request = new rmq::QueryAssignmentRequest();
+ request.ClientId = clientId();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = _resourceNamespace;
+ request.Topic.Name = topic;
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+ request.Endpoints = AccessEndpoint(_nameServers[_currentNameServerIndex]);
+ try
+ {
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ return await Manager.QueryLoadAssignment(target, metadata, request, getIoTimeout());
+ }
+ catch (System.Exception e)
+ {
+ Logger.Warn(e, $"Failed to acquire load assignments from {target}");
+ }
+ // Just return an empty list.
+ return new List<rmq.Assignment>();
+ }
+
+ private string TargetUrl(rmq::Assignment assignment)
+ {
+ var broker = assignment.Partition.Broker;
+ var addresses = broker.Endpoints.Addresses;
+ // TODO: use the first address for now.
+ var address = addresses[0];
+ return $"https://{address.Host}:{address.Port}";
+ }
+
+
+ public async Task<List<Message>> ReceiveMessage(rmq::Assignment assignment, string group)
+ {
+ var targetUrl = TargetUrl(assignment);
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ var request = new rmq::ReceiveMessageRequest();
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+ request.Partition = assignment.Partition;
+ var messages = await Manager.ReceiveMessage(targetUrl, metadata, request, getLongPollingTimeout());
+ return messages;
+ }
+
+ public async Task<Boolean> Ack(string target, string group, string topic, string receiptHandle, String messageId)
+ {
+ var request = new rmq::AckMessageRequest();
+ request.ClientId = clientId();
+ request.ReceiptHandle = receiptHandle;
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = _resourceNamespace;
+ request.Topic.Name = topic;
+
+ request.MessageId = messageId;
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
+ return await Manager.Ack(target, metadata, request, getIoTimeout());
}
- public void HealthCheck()
+ public async Task<Boolean> Nack(string target, string group, string topic, string receiptHandle, String messageId)
{
+ var request = new rmq::NackMessageRequest();
+ request.ClientId = clientId();
+ request.ReceiptHandle = receiptHandle;
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = _resourceNamespace;
+ request.Group.Name = group;
+
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = _resourceNamespace;
+ request.Topic.Name = topic;
+
+ request.MessageId = messageId;
+ var metadata = new grpc::Metadata();
+ Signature.sign(this, metadata);
+ return await Manager.Nack(target, metadata, request, getIoTimeout());
}
public async Task<bool> NotifyClientTermination()
{
- List<string> endpoints = endpointsInUse();
- var request = new NotifyClientTerminationRequest();
+ List<string> endpoints = AvailableBrokerEndpoints();
+ var request = new rmq::NotifyClientTerminationRequest();
request.ClientId = clientId();
var metadata = new grpc.Metadata();
@@ -263,12 +457,6 @@ namespace Org.Apache.Rocketmq
return true;
}
- private List<string> endpointsInUse()
- {
- //TODO: gather endpoints from route entries.
- return new List<string>();
- }
-
protected readonly IClientManager Manager;
private readonly INameServerResolver _nameServerResolver;
@@ -279,6 +467,8 @@ namespace Org.Apache.Rocketmq
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteTable;
private readonly CancellationTokenSource _updateTopicRouteCts;
+ private readonly CancellationTokenSource _healthCheckCts;
+
protected const int MaxTransparentRetry = 3;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 54052ce..b708c32 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -25,7 +25,7 @@ namespace Org.Apache.Rocketmq {
var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
this._ioTimeout = TimeSpan.FromSeconds(3);
- this.longPollingIoTimeout_ = TimeSpan.FromSeconds(15);
+ this.longPollingIoTimeout_ = TimeSpan.FromSeconds(30);
}
public string region() {
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 43af961..8640d93 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -17,15 +17,21 @@
using rmq = Apache.Rocketmq.V1;
using System;
+using System.IO;
+using System.IO.Compression;
using System.Threading;
using System.Threading.Tasks;
using grpc = Grpc.Core;
using System.Collections.Generic;
+using System.Security.Cryptography;
+using NLog;
namespace Org.Apache.Rocketmq
{
public class ClientManager : IClientManager
{
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
public ClientManager()
{
_rpcClients = new Dictionary<string, RpcClient>();
@@ -150,12 +156,23 @@ namespace Org.Apache.Rocketmq
{
var rpcClient = GetRpcClient(target);
var response = await rpcClient.Heartbeat(metadata, request, timeout);
- if (null == response)
+ Logger.Debug($"Heartbeat to {target} response status: {response.Common.Status.ToString()}");
+ return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+ }
+
+ public async Task<Boolean> HealthCheck(string target, grpc::Metadata metadata, rmq::HealthCheckRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ try
+ {
+ var response = await rpcClient.HealthCheck(metadata, request, timeout);
+ return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
+ }
+ catch (System.Exception e)
{
+ Logger.Debug(e, $"Health-check to {target} failed");
return false;
}
-
- return response.Common.Status.Code == (int)Google.Rpc.Code.Ok;
}
public async Task<rmq::SendMessageResponse> SendMessage(string target, grpc::Metadata metadata,
@@ -175,6 +192,121 @@ namespace Org.Apache.Rocketmq
return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
}
+ public async Task<List<rmq::Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, rmq::QueryAssignmentRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ rmq::QueryAssignmentResponse response = await rpcClient.QueryAssignment(metadata, request, timeout);
+ if (response.Common.Status.Code != (int)Google.Rpc.Code.Ok)
+ {
+ // TODO: Build exception hierarchy
+ throw new Exception($"Failed to query load assignment from server. Cause: {response.Common.Status.Message}");
+ }
+
+ List<rmq::Assignment> assignments = new List<rmq.Assignment>();
+ foreach (var item in response.Assignments)
+ {
+ assignments.Add(item);
+ }
+ return assignments;
+ }
+
+ public async Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, rmq::ReceiveMessageRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ rmq::ReceiveMessageResponse response = await rpcClient.ReceiveMessage(metadata, request, timeout);
+ if (response.Common.Status.Code != (int)Google.Rpc.Code.Ok)
+ {
+ throw new Exception($"Failed to receive messages from {target}, cause: {response.Common.Status.Message}");
+ }
+
+ List<Message> messages = new List<Message>();
+ foreach (var message in response.Messages)
+ {
+ messages.Add(convert(target, message));
+ }
+ return messages;
+ }
+
+ private Message convert(string sourceHost, rmq::Message message)
+ {
+ var msg = new Message();
+ msg.Topic = message.Topic.Name;
+ msg.messageId = message.SystemAttribute.MessageId;
+ msg.Tag = message.SystemAttribute.Tag;
+
+ // Validate message body checksum
+ byte[] raw = message.Body.ToByteArray();
+ if (rmq::DigestType.Crc32 == message.SystemAttribute.BodyDigest.Type)
+ {
+ uint checksum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length);
+ if (!message.SystemAttribute.BodyDigest.Checksum.Equals(checksum.ToString("X")))
+ {
+ msg._bodyChecksumVerified = false;
+ }
+ }
+ else if (rmq::DigestType.Md5 == message.SystemAttribute.BodyDigest.Type)
+ {
+ var checksum = MD5.HashData(raw);
+ if (!message.SystemAttribute.BodyDigest.Checksum.Equals(Convert.ToHexString(checksum)))
+ {
+ msg._bodyChecksumVerified = false;
+ }
+ }
+ else if (rmq::DigestType.Sha1 == message.SystemAttribute.BodyDigest.Type)
+ {
+ var checksum = SHA1.HashData(raw);
+ if (!message.SystemAttribute.BodyDigest.Checksum.Equals(Convert.ToHexString(checksum)))
+ {
+ msg._bodyChecksumVerified = false;
+ }
+ }
+
+ foreach (var entry in message.UserAttribute)
+ {
+ msg.UserProperties.Add(entry.Key, entry.Value);
+ }
+
+ msg._receiptHandle = message.SystemAttribute.ReceiptHandle;
+ msg._sourceHost = sourceHost;
+
+ foreach (var key in message.SystemAttribute.Keys)
+ {
+ msg.Keys.Add(key);
+ }
+
+ msg._deliveryAttempt = message.SystemAttribute.DeliveryAttempt;
+
+ if (message.SystemAttribute.BodyEncoding == rmq::Encoding.Gzip)
+ {
+ // Decompress/Inflate message body
+ var inputStream = new MemoryStream(message.Body.ToByteArray());
+ var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress);
+ var outputStream = new MemoryStream();
+ gzipStream.CopyTo(outputStream);
+ msg.Body = outputStream.ToArray();
+ }
+ else
+ {
+ msg.Body = message.Body.ToByteArray();
+ }
+
+ return msg;
+ }
+
+ public async Task<Boolean> Ack(string target, grpc::Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ var response = await rpcClient.AckMessage(metadata, request, timeout);
+ return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+ }
+
+ public async Task<Boolean> Nack(string target, grpc::Metadata metadata, rmq::NackMessageRequest request, TimeSpan timeout)
+ {
+ var rpcClient = GetRpcClient(target);
+ var response = await rpcClient.NackMessage(metadata, request, timeout);
+ return response.Common.Status.Code == ((int)Google.Rpc.Code.Ok);
+ }
+
public async Task Shutdown()
{
_clientLock.EnterReadLock();
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/ExpressionType.cs
similarity index 82%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/ExpressionType.cs
index 7749c07..0caaf8e 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/ExpressionType.cs
@@ -14,19 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-using System.Threading.Tasks;
-
namespace Org.Apache.Rocketmq
{
- public interface IClient : IClientConfig
- {
-
- void Heartbeat();
-
- void HealthCheck();
-
- Task<bool> NotifyClientTermination();
+ public enum ExpressionType
+ {
+ TAG,
+ SQL92,
}
+
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/FilterExpression.cs
similarity index 65%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/FilterExpression.cs
index 7749c07..0dc0b92 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/FilterExpression.cs
@@ -14,19 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-using System.Threading.Tasks;
-
namespace Org.Apache.Rocketmq
{
- public interface IClient : IClientConfig
+ public class FilterExpression
{
- void Heartbeat();
+ public FilterExpression(string expression, ExpressionType type)
+ {
+ _expression = expression;
+ _type = type;
+ }
+
+ private ExpressionType _type;
+
+ private string _expression;
- void HealthCheck();
+ public ExpressionType Type
+ {
+ get { return _type; }
+ }
- Task<bool> NotifyClientTermination();
+ public string Expression
+ {
+ get { return _expression; }
+ }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 7749c07..209871f 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -16,15 +16,16 @@
*/
using System.Threading.Tasks;
+using System;
namespace Org.Apache.Rocketmq
{
public interface IClient : IClientConfig
{
- void Heartbeat();
+ Task Heartbeat();
- void HealthCheck();
+ Task HealthCheck();
Task<bool> NotifyClientTermination();
diff --git a/rocketmq-client-csharp/IClientManager.cs b/rocketmq-client-csharp/IClientManager.cs
index 3102f60..f5538f1 100644
--- a/rocketmq-client-csharp/IClientManager.cs
+++ b/rocketmq-client-csharp/IClientManager.cs
@@ -18,7 +18,10 @@
using Apache.Rocketmq.V1;
using System.Threading.Tasks;
using System;
+using System.Collections.Generic;
using grpc = global::Grpc.Core;
+using rmq = Apache.Rocketmq.V1;
+
namespace Org.Apache.Rocketmq {
public interface IClientManager {
@@ -26,12 +29,23 @@ namespace Org.Apache.Rocketmq {
Task<TopicRouteData> ResolveRoute(string target, grpc::Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
- Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
+ Task<Boolean> Heartbeat(string target, grpc::Metadata metadata, rmq::HeartbeatRequest request, TimeSpan timeout);
+
+ Task<Boolean> HealthCheck(string target, grpc::Metadata metadata, rmq::HealthCheckRequest request, TimeSpan timeout);
Task<Boolean> NotifyClientTermination(string target, grpc::Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout);
Task<SendMessageResponse> SendMessage(string target, grpc::Metadata metadata, SendMessageRequest request, TimeSpan timeout);
+ Task<List<Assignment>> QueryLoadAssignment(string target, grpc::Metadata metadata, QueryAssignmentRequest request, TimeSpan timeout);
+
+ Task<List<Message>> ReceiveMessage(string target, grpc::Metadata metadata, ReceiveMessageRequest request, TimeSpan timeout);
+
+
+ Task<Boolean> Ack(string target, grpc::Metadata metadata, AckMessageRequest request, TimeSpan timeout);
+
+ Task<Boolean> Nack(string target, grpc::Metadata metadata, NackMessageRequest request, TimeSpan timeout);
+
Task Shutdown();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IConsumer.cs
similarity index 82%
copy from rocketmq-client-csharp/IProducer.cs
copy to rocketmq-client-csharp/IConsumer.cs
index 43a7815..ac4d787 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -14,15 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-using System.Threading.Tasks;
-
-namespace Org.Apache.Rocketmq {
- public interface IProducer {
+namespace Org.Apache.Rocketmq
+{
+ public interface IConsumer
+ {
void Start();
- Task Shutdown();
-
- Task<SendResult> Send(Message message);
+ void Shutdown();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IMessageListener.cs
similarity index 85%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/IMessageListener.cs
index 7749c07..f46efd5 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IMessageListener.cs
@@ -15,18 +15,16 @@
* limitations under the License.
*/
+using System.Collections.Generic;
using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
- public interface IClient : IClientConfig
- {
-
- void Heartbeat();
-
- void HealthCheck();
- Task<bool> NotifyClientTermination();
+ public interface IMessageListener
+ {
+ Task Consume(List<Message> messages, List<Message> failed);
}
+
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 43a7815..8819c4f 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -21,7 +21,7 @@ namespace Org.Apache.Rocketmq {
public interface IProducer {
void Start();
- Task Shutdown();
+ void Shutdown();
Task<SendResult> Send(Message message);
}
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 66fd47d..5cbf1aa 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -39,12 +39,23 @@ namespace Org.Apache.Rocketmq
this.systemProperties = new Dictionary<string, string>();
}
- private string messageId;
+ internal string messageId;
public string MessageId
{
get { return messageId; }
}
+ internal string _receiptHandle;
+ internal string _sourceHost;
+
+ internal int _deliveryAttempt;
+ public int DeliveryAttempt
+ {
+ get { return _deliveryAttempt; }
+ }
+
+ internal bool _bodyChecksumVerified = true;
+
private string topic;
public string Topic {
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/ProcessQueue.cs
similarity index 61%
copy from rocketmq-client-csharp/IClient.cs
copy to rocketmq-client-csharp/ProcessQueue.cs
index 7749c07..1022978 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/ProcessQueue.cs
@@ -14,19 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-using System.Threading.Tasks;
-
+using System;
namespace Org.Apache.Rocketmq
{
- public interface IClient : IClientConfig
+ public class ProcessQueue
{
- void Heartbeat();
+ public ProcessQueue()
+ {
+ _lastReceivedTime = DateTime.UtcNow;
+ }
+ public bool Dropped { get; set; }
+
+ private DateTime _lastReceivedTime;
- void HealthCheck();
+ public DateTime LastReceiveTime
+ {
+ get { return _lastReceivedTime; }
+ set { _lastReceivedTime = value; }
+ }
- Task<bool> NotifyClientTermination();
+ internal bool Expired()
+ {
+ return DateTime.UtcNow.Subtract(_lastReceivedTime).TotalMilliseconds > 30 * 1000;
+ }
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 388a4a6..e2e023d 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -39,10 +39,10 @@ namespace Org.Apache.Rocketmq
// More initalization
}
- public override async Task Shutdown()
+ public override void Shutdown()
{
// Release local resources
- await base.Shutdown();
+ base.Shutdown();
}
public override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
new file mode 100644
index 0000000..382dbfb
--- /dev/null
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.Rocketmq.V1;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V1;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+ public class PushConsumer : Client, IConsumer
+ {
+ public PushConsumer(INameServerResolver resolver, string resourceNamespace, string group) : base(resolver, resourceNamespace)
+ {
+ _group = group;
+ _topicFilterExpressionMap = new ConcurrentDictionary<string, FilterExpression>();
+ _topicAssignmentsMap = new ConcurrentDictionary<string, List<rmq::Assignment>>();
+ _processQueueMap = new ConcurrentDictionary<Assignment, ProcessQueue>();
+ _scanAssignmentCTS = new CancellationTokenSource();
+ _scanExpiredProcessQueueCTS = new CancellationTokenSource();
+ }
+
+ public override async void Start()
+ {
+ if (null == _messageListener)
+ {
+ throw new System.Exception("Bad configuration: message listener is required");
+ }
+
+ base.Start();
+
+ // Step-1: Resolve topic routes
+ List<Task<TopicRouteData>> queryRouteTasks = new List<Task<TopicRouteData>>();
+ foreach (var item in _topicFilterExpressionMap)
+ {
+ queryRouteTasks.Add(GetRouteFor(item.Key, true));
+ }
+ Task.WhenAll(queryRouteTasks).GetAwaiter().GetResult();
+
+ // Step-2: Send heartbeats to all involving brokers so that we may get immediate, valid assignments.
+ await Heartbeat();
+
+ // Step-3: Scan load assignments that are assigned to current client
+ schedule(async () =>
+ {
+ await scanLoadAssignments();
+ }, 10, _scanAssignmentCTS.Token);
+
+ schedule(() =>
+ {
+ ScanExpiredProcessQueue();
+ }, 10, _scanExpiredProcessQueueCTS.Token);
+ }
+
+ public override void Shutdown()
+ {
+ _scanAssignmentCTS.Cancel();
+ _scanExpiredProcessQueueCTS.Cancel();
+
+ // Shutdown resources of derived class
+ base.Shutdown();
+ }
+
+ private async Task scanLoadAssignments()
+ {
+ Logger.Debug("Start to scan load assignments from server");
+ List<Task<List<Assignment>>> tasks = new List<Task<List<Assignment>>>();
+ foreach (var item in _topicFilterExpressionMap)
+ {
+ tasks.Add(scanLoadAssignment(item.Key, _group));
+ }
+ var result = await Task.WhenAll(tasks);
+
+ foreach (var assignments in result)
+ {
+ if (assignments.Count == 0)
+ {
+ continue;
+ }
+
+ checkAndUpdateAssignments(assignments);
+ }
+ Logger.Debug("Completed scanning load assignments");
+ }
+
+ private void ScanExpiredProcessQueue()
+ {
+ foreach (var item in _processQueueMap)
+ {
+ if (item.Value.Expired())
+ {
+ Task.Run(async () =>
+ {
+ await ExecutePop0(item.Key);
+ });
+ }
+ }
+ }
+
+ private void checkAndUpdateAssignments(List<Assignment> assignments)
+ {
+ if (assignments.Count == 0)
+ {
+ return;
+ }
+
+ string topic = assignments[0].Partition.Topic.Name;
+
+ // Compare to generate or cancel pop-cycles
+ List<Assignment> existing;
+ _topicAssignmentsMap.TryGetValue(topic, out existing);
+
+ foreach (var assignment in assignments)
+ {
+ if (null == existing || !existing.Contains(assignment))
+ {
+ ExecutePop(assignment);
+ }
+ }
+
+ if (null != existing)
+ {
+ foreach (var assignment in existing)
+ {
+ if (!assignments.Contains(assignment))
+ {
+ Logger.Info($"Stop receiving messages from {assignment.Partition.ToString()}");
+ CancelPop(assignment);
+ }
+ }
+ }
+
+ }
+
+ private void ExecutePop(Assignment assignment)
+ {
+ var processQueue = new ProcessQueue();
+ if (_processQueueMap.TryAdd(assignment, processQueue))
+ {
+ Task.Run(async () =>
+ {
+ await ExecutePop0(assignment);
+ });
+ }
+ }
+
+ private async Task ExecutePop0(Assignment assignment)
+ {
+ Logger.Info($"Start to pop {assignment.Partition.ToString()}");
+ while (true)
+ {
+ try
+ {
+ ProcessQueue processQueue;
+ if (!_processQueueMap.TryGetValue(assignment, out processQueue))
+ {
+ break;
+ }
+
+ if (processQueue.Dropped)
+ {
+ break;
+ }
+
+ List<Message> messages = await base.ReceiveMessage(assignment, _group);
+ processQueue.LastReceiveTime = System.DateTime.UtcNow;
+
+ // TODO: cache message and dispatch them
+
+ List<Message> failed = new List<Message>();
+ await _messageListener.Consume(messages, failed);
+
+ foreach (var message in failed)
+ {
+ await base.Nack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+ }
+
+ foreach (var message in messages)
+ {
+ if (!failed.Contains(message))
+ {
+ bool success = await base.Ack(message._sourceHost, _group, message.Topic, message._receiptHandle, message.MessageId);
+ if (!success)
+ {
+ //TODO: log error.
+ }
+ }
+ }
+ }
+ catch (System.Exception e)
+ {
+ // TODO: log exception raised.
+ }
+
+
+ }
+ }
+
+ private void CancelPop(Assignment assignment)
+ {
+ if (!_processQueueMap.ContainsKey(assignment))
+ {
+ return;
+ }
+
+ ProcessQueue processQueue;
+ if (_processQueueMap.Remove(assignment, out processQueue))
+ {
+ processQueue.Dropped = true;
+ }
+ }
+
+ public override void PrepareHeartbeatData(HeartbeatRequest request)
+ {
+ request.ClientId = clientId();
+ var consumerData = new ConsumerData();
+ consumerData.ConsumeType = ConsumeMessageType.Passive;
+ consumerData.ConsumeModel = ConsumeModel.Clustering;
+ consumerData.Group = new Resource();
+ consumerData.Group.ResourceNamespace = resourceNamespace();
+ consumerData.Group.Name = _group;
+
+ foreach (var item in _topicFilterExpressionMap)
+ {
+ var sub = new SubscriptionEntry();
+ sub.Topic = new Resource();
+ sub.Topic.ResourceNamespace = _resourceNamespace;
+ sub.Topic.Name = item.Key;
+
+ sub.Expression = new rmq::FilterExpression();
+ switch (item.Value.Type)
+ {
+ case ExpressionType.TAG:
+ sub.Expression.Type = rmq::FilterType.Tag;
+ break;
+ case ExpressionType.SQL92:
+ sub.Expression.Type = rmq::FilterType.Sql;
+ break;
+ }
+ sub.Expression.Expression = item.Value.Expression;
+ consumerData.Subscriptions.Add(sub);
+ }
+ request.ConsumerData = consumerData;
+ }
+
+ public void Subscribe(string topic, string expression, ExpressionType type)
+ {
+ var filterExpression = new FilterExpression(expression, type);
+ _topicFilterExpressionMap[topic] = filterExpression;
+
+ }
+
+ public void RegisterListener(IMessageListener listener)
+ {
+ if (null != listener)
+ {
+ _messageListener = listener;
+ }
+ }
+
+ private string _group;
+
+ private ConcurrentDictionary<string, FilterExpression> _topicFilterExpressionMap;
+ private IMessageListener _messageListener;
+
+ private CancellationTokenSource _scanAssignmentCTS;
+
+ private ConcurrentDictionary<string, List<rmq::Assignment>> _topicAssignmentsMap;
+
+ private ConcurrentDictionary<Assignment, ProcessQueue> _processQueueMap;
+
+ private CancellationTokenSource _scanExpiredProcessQueueCTS;
+
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index 7129e9f..fda655c 100644
--- a/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -11,6 +11,7 @@
</PropertyGroup>
<ItemGroup>
+ <PackageReference Include="Crc32.NET" Version="1.2.0" />
<PackageReference Include="Google.Protobuf" Version="3.19.4" />
<PackageReference Include="Grpc.Net.Client" Version="2.42.0" />
<PackageReference Include="Grpc.Tools" Version="2.43.0">
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index a2ef2c7..7dde545 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -46,7 +46,6 @@ namespace Org.Apache.Rocketmq
public void testSendMessage()
{
var producer = new Producer(resolver, resourceNamespace);
- producer.ResourceNamespace = resourceNamespace;
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
producer.Start();
@@ -55,7 +54,7 @@ namespace Org.Apache.Rocketmq
var msg = new Message(topic, body);
var sendResult = producer.Send(msg).GetAwaiter().GetResult();
Assert.IsNotNull(sendResult);
- producer.Shutdown().GetAwaiter().GetResult();
+ producer.Shutdown();
}
private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
diff --git a/tests/ProducerTest.cs b/tests/PushConsumerTest.cs
similarity index 53%
copy from tests/ProducerTest.cs
copy to tests/PushConsumerTest.cs
index a2ef2c7..5250bb8 100644
--- a/tests/ProducerTest.cs
+++ b/tests/PushConsumerTest.cs
@@ -17,12 +17,37 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Collections.Generic;
using System;
+using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
+ public class TestMessageListener : IMessageListener
+ {
+ public async Task Consume(List<Message> messages, List<Message> failed)
+ {
+ foreach (var message in messages)
+ {
+ Console.WriteLine("");
+ }
+ }
+
+ }
+
+ public class CountableMessageListener : IMessageListener
+ {
+ public async Task Consume(List<Message> messages, List<Message> failed)
+ {
+ foreach (var message in messages)
+ {
+ Console.WriteLine("{}", message.MessageId);
+ }
+ }
+
+ }
+
[TestClass]
- public class ProducerTest
+ public class PushConsumerTest
{
[ClassInitialize]
@@ -41,23 +66,35 @@ namespace Org.Apache.Rocketmq
}
+ [TestMethod]
+ public void testLifecycle()
+ {
+ var consumer = new PushConsumer(resolver, resourceNamespace, group);
+ consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ consumer.Region = "cn-hangzhou-pre";
+ consumer.Subscribe(topic, "*", ExpressionType.TAG);
+ consumer.RegisterListener(new TestMessageListener());
+ consumer.Start();
+ consumer.Shutdown();
+ }
+
+
+ // [Ignore]
[TestMethod]
- public void testSendMessage()
+ public void testConsumeMessage()
{
- var producer = new Producer(resolver, resourceNamespace);
- producer.ResourceNamespace = resourceNamespace;
- producer.CredentialsProvider = new ConfigFileCredentialsProvider();
- producer.Region = "cn-hangzhou-pre";
- producer.Start();
- byte[] body = new byte[1024];
- Array.Fill(body, (byte)'x');
- var msg = new Message(topic, body);
- var sendResult = producer.Send(msg).GetAwaiter().GetResult();
- Assert.IsNotNull(sendResult);
- producer.Shutdown().GetAwaiter().GetResult();
+ var consumer = new PushConsumer(resolver, resourceNamespace, group);
+ consumer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ consumer.Region = "cn-hangzhou-pre";
+ consumer.Subscribe(topic, "*", ExpressionType.TAG);
+ consumer.RegisterListener(new CountableMessageListener());
+ consumer.Start();
+ System.Threading.Thread.Sleep(System.TimeSpan.FromSeconds(300));
+ consumer.Shutdown();
}
+
private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
private static string topic = "cpp_sdk_standard";
@@ -69,6 +106,7 @@ namespace Org.Apache.Rocketmq
private static ICredentialsProvider credentialsProvider;
private static string host = "116.62.231.199";
private static int port = 80;
+
}
}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 40e2b64..c66318d 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -15,13 +15,12 @@
* limitations under the License.
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Grpc.Core.Interceptors;
-using System.Net.Http;
-using Grpc.Net.Client;
using rmq = global::Apache.Rocketmq.V1;
using grpc = global::Grpc.Core;
using System;
using pb = global::Google.Protobuf;
+using System.Collections.Generic;
+using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
@@ -65,8 +64,20 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc::Metadata();
Signature.sign(clientConfig, metadata);
-
- var response = rpcClient.QueryRoute(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+
+ // Execute route query multiple times.
+ List<Task<rmq::QueryRouteResponse>> tasks = new List<Task<rmq.QueryRouteResponse>>();
+ for (int i = 0; i < 16; i++)
+ {
+ tasks.Add(rpcClient.QueryRoute(metadata, request, clientConfig.getIoTimeout()));
+ }
+
+ // Verify
+ var result = Task.WhenAll(tasks).GetAwaiter().GetResult();
+ foreach (var item in result)
+ {
+ Assert.AreEqual(0, item.Common.Status.Code);
+ }
}
@@ -85,6 +96,7 @@ namespace Org.Apache.Rocketmq
Signature.sign(clientConfig, metadata);
var response = rpcClient.Heartbeat(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+ Assert.AreEqual("ok", response.Common.Status.Message);
}
[TestMethod]
@@ -114,6 +126,202 @@ namespace Org.Apache.Rocketmq
var response = rpcClient.SendMessage(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
}
+ [TestMethod]
+ public void testHealthCheck()
+ {
+ var request = new rmq::HealthCheckRequest();
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = resourceNamespace;
+ request.Group.Name = group;
+ request.ClientHost = "test";
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+ var response = rpcClient.HealthCheck(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+ Assert.AreEqual("ok", response.Common.Status.Message);
+ }
+
+ [TestMethod]
+ public void HeartbeatAsConsumer()
+ {
+ var request = new rmq::HeartbeatRequest();
+ request.ClientId = clientId;
+ request.ConsumerData = new rmq::ConsumerData();
+ request.ConsumerData.Group = new rmq::Resource();
+ request.ConsumerData.Group.ResourceNamespace = resourceNamespace;
+ request.ConsumerData.Group.Name = group;
+
+ request.ConsumerData.ConsumeModel = rmq::ConsumeModel.Clustering;
+ request.ConsumerData.ConsumePolicy = rmq::ConsumePolicy.Resume;
+ request.ConsumerData.ConsumeType = rmq::ConsumeMessageType.Passive;
+
+ var subscription = new rmq::SubscriptionEntry();
+ subscription.Topic = new rmq::Resource();
+ subscription.Topic.ResourceNamespace = resourceNamespace;
+ subscription.Topic.Name = topic;
+ subscription.Expression = new rmq::FilterExpression();
+ subscription.Expression.Type = rmq::FilterType.Tag;
+ subscription.Expression.Expression = "*";
+ request.ConsumerData.Subscriptions.Add(subscription);
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+
+ var response = rpcClient.Heartbeat(metadata, request, TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+ Assert.AreEqual("ok", response.Common.Status.Message);
+ }
+
+ private rmq::QueryAssignmentResponse queryAssignments()
+ {
+ HeartbeatAsConsumer();
+ var request = new rmq::QueryAssignmentRequest();
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Host = host;
+ address.Port = port;
+ request.Endpoints.Addresses.Add(address);
+
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = resourceNamespace;
+ request.Group.Name = group;
+
+ request.ClientId = clientId;
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = resourceNamespace;
+ request.Topic.Name = topic;
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+
+ var response = rpcClient.QueryAssignment(metadata, request, System.TimeSpan.FromSeconds(3)).GetAwaiter().GetResult();
+ return response;
+ }
+
+ [TestMethod]
+ public void testQueryAssignment()
+ {
+ var response = queryAssignments();
+
+ Assert.AreEqual("ok", response.Common.Status.Message);
+ Assert.IsTrue(response.Assignments.Count > 0);
+ }
+
+ private rmq::ReceiveMessageResponse DoReceiveMessage()
+ {
+ var assignmentsResponse = queryAssignments();
+ Assert.IsTrue(assignmentsResponse.Assignments.Count > 0);
+ var assignment = assignmentsResponse.Assignments[0];
+
+ // Send some prior messages
+ for (int i = 0; i < batchSize; i++)
+ {
+ testSendMessage();
+ }
+
+ var request = new rmq::ReceiveMessageRequest();
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = resourceNamespace;
+ request.Group.Name = group;
+
+ request.ClientId = clientId;
+ request.Partition = assignment.Partition;
+
+ request.FilterExpression = new rmq::FilterExpression();
+ request.FilterExpression.Type = rmq::FilterType.Tag;
+ request.FilterExpression.Expression = "*";
+
+ request.ConsumePolicy = rmq::ConsumePolicy.Resume;
+ request.BatchSize = batchSize;
+
+ request.InvisibleDuration = new Google.Protobuf.WellKnownTypes.Duration();
+ request.InvisibleDuration.Seconds = 10;
+ request.InvisibleDuration.Nanos = 0;
+
+ request.AwaitTime = new Google.Protobuf.WellKnownTypes.Duration();
+ request.AwaitTime.Seconds = 10;
+ request.AwaitTime.Nanos = 0;
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+ var response = rpcClient.ReceiveMessage(metadata, request, TimeSpan.FromSeconds(15)).GetAwaiter().GetResult();
+ return response;
+ }
+
+ [TestMethod]
+ public void testReceiveMessage()
+ {
+ var response = DoReceiveMessage();
+ Assert.AreEqual(0, response.Common.Status.Code);
+ Assert.IsTrue(response.Messages.Count > 0);
+ }
+
+ [TestMethod]
+ public void testAck()
+ {
+ var receiveMessageResponse = DoReceiveMessage();
+
+ List<Task<rmq::AckMessageResponse>> tasks = new List<Task<rmq.AckMessageResponse>>();
+
+ foreach (var message in receiveMessageResponse.Messages)
+ {
+ var request = new rmq::AckMessageRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = resourceNamespace;
+ request.Topic.Name = topic;
+
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = resourceNamespace;
+ request.Group.Name = group;
+
+ request.ClientId = clientId;
+
+ request.ReceiptHandle = message.SystemAttribute.ReceiptHandle;
+ request.MessageId = message.SystemAttribute.MessageId;
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+ tasks.Add(rpcClient.AckMessage(metadata, request, TimeSpan.FromSeconds(3)));
+ }
+ var result = Task.WhenAll(tasks).GetAwaiter().GetResult();
+ foreach (var item in result)
+ {
+ Assert.AreEqual("ok", item.Common.Status.Message);
+ }
+ }
+
+ [TestMethod]
+ public void testNack()
+ {
+ var receiveMessageResponse = DoReceiveMessage();
+ List<Task<rmq::NackMessageResponse>> tasks = new List<Task<rmq.NackMessageResponse>>();
+ foreach (var message in receiveMessageResponse.Messages)
+ {
+ var request = new rmq::NackMessageRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.ResourceNamespace = resourceNamespace;
+ request.Topic.Name = topic;
+
+ request.Group = new rmq::Resource();
+ request.Group.ResourceNamespace = resourceNamespace;
+ request.Group.Name = group;
+
+ request.ClientId = clientId;
+
+ request.ReceiptHandle = message.SystemAttribute.ReceiptHandle;
+ request.MessageId = message.SystemAttribute.MessageId;
+ request.DeliveryAttempt = 1;
+ request.MaxDeliveryAttempts = 16;
+
+ var metadata = new grpc::Metadata();
+ Signature.sign(clientConfig, metadata);
+ tasks.Add(rpcClient.NackMessage(metadata, request, TimeSpan.FromSeconds(3)));
+ }
+ var result = Task.WhenAll(tasks).GetAwaiter().GetResult();
+ foreach (var item in result)
+ {
+ Assert.AreEqual(0, item.Common.Status.Code);
+ }
+ }
+
// Remove the Ignore annotation if server has fixed
[Ignore]
[TestMethod]
@@ -141,7 +349,10 @@ namespace Org.Apache.Rocketmq
private static string host = "116.62.231.199";
private static int port = 80;
+ private static int batchSize = 32;
+
private static IRpcClient rpcClient;
+
private static ClientConfig clientConfig;
}
}
\ No newline at end of file
diff --git a/tests/UnitTest1.cs b/tests/UnitTest1.cs
index acbad60..e622616 100644
--- a/tests/UnitTest1.cs
+++ b/tests/UnitTest1.cs
@@ -4,6 +4,9 @@ using Grpc.Net.Client;
using Apache.Rocketmq.V1;
using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
namespace tests
{
[TestClass]
@@ -45,5 +48,26 @@ namespace tests
public void TestRpcClientImplCtor() {
RpcClient impl = new RpcClient("https://localhost:5001");
}
+
+ [TestMethod]
+ public void TestConcurrentDictionary()
+ {
+ var dict = new ConcurrentDictionary<string, List<String>>();
+ string s = "abc";
+ List<String> result;
+ var exists = dict.TryGetValue(s, out result);
+ Assert.IsFalse(exists);
+ Assert.IsNull(result);
+
+ result = new List<string>();
+ result.Add("abc");
+ Assert.IsTrue(dict.TryAdd(s, result));
+
+ List<String> list;
+ exists = dict.TryGetValue(s, out list);
+ Assert.IsTrue(exists);
+ Assert.IsNotNull(list);
+ Assert.AreEqual(1, list.Count);
+ }
}
}