You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:47 UTC
[rocketmq-clients] 07/28: Implement simple consumer
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 26bb67c51c279013fabd32e4eeb7809aaf9e4f56
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 13 17:13:57 2023 +0800
Implement simple consumer
---
csharp/examples/ProducerDelayMessageExample.cs | 77 +++++----
csharp/examples/ProducerNormalMessageExample.cs | 6 +-
csharp/examples/SimpleConsumerExample.cs | 68 ++++----
csharp/rocketmq-client-csharp/Address.cs | 25 ++-
csharp/rocketmq-client-csharp/AddressScheme.cs | 10 +-
csharp/rocketmq-client-csharp/Broker.cs | 10 ++
csharp/rocketmq-client-csharp/Client.cs | 25 +--
csharp/rocketmq-client-csharp/ClientManager.cs | 33 ++--
csharp/rocketmq-client-csharp/Consumer.cs | 99 +++++++++++
.../rocketmq-client-csharp/Error/StatusChecker.cs | 41 -----
csharp/rocketmq-client-csharp/FilterExpression.cs | 6 +
csharp/rocketmq-client-csharp/IClientManager.cs | 33 ++++
csharp/rocketmq-client-csharp/MessageQueue.cs | 27 ++-
csharp/rocketmq-client-csharp/MessageView.cs | 44 ++---
csharp/rocketmq-client-csharp/MqEncoding.cs | 10 +-
csharp/rocketmq-client-csharp/Permission.cs | 22 ++-
csharp/rocketmq-client-csharp/Producer.cs | 8 +-
csharp/rocketmq-client-csharp/Publishing.cs | 4 +-
.../PublishingLoadBalancer.cs | 33 ++--
.../rocketmq-client-csharp/PublishingSettings.cs | 17 +-
.../{IProducer.cs => ReceiveMessageResult.cs} | 14 +-
csharp/rocketmq-client-csharp/Resource.cs | 8 +-
csharp/rocketmq-client-csharp/RpcClient.cs | 42 ++---
csharp/rocketmq-client-csharp/Settings.cs | 10 +-
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 192 +++++++++++++++++++++
.../SimpleSubscriptionSettings.cs | 95 ++++++++++
csharp/rocketmq-client-csharp/StatusChecker.cs | 1 +
.../SubscriptionLoadBalancer.cs | 38 ++--
csharp/rocketmq-client-csharp/TopicRouteData.cs | 10 +-
csharp/rocketmq-client-csharp/Utilities.cs | 5 +-
csharp/tests/UnitTest1.cs | 51 +++---
31 files changed, 748 insertions(+), 316 deletions(-)
diff --git a/csharp/examples/ProducerDelayMessageExample.cs b/csharp/examples/ProducerDelayMessageExample.cs
index 27e32e76..9ad5fbb9 100644
--- a/csharp/examples/ProducerDelayMessageExample.cs
+++ b/csharp/examples/ProducerDelayMessageExample.cs
@@ -24,49 +24,50 @@ using Org.Apache.Rocketmq;
namespace examples
{
- static class ProducerDelayMessageExample
+ internal static class ProducerDelayMessageExample
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
internal static async Task QuickStart()
{
- // string accessKey = "yourAccessKey";
- // string secretKey = "yourSecretKey";
- // // Credential provider is optional for client configuration.
- // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- // string endpoints = "foobar.com:8080";
- // // In most case, you don't need to create too many producers, single pattern is recommended.
- // var producer = new Producer(endpoints)
- // {
- // CredentialsProvider = credentialsProvider
- // };
- // string topic = "yourDelayTopic";
- // // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
- // // the topic route before message publishing.
- // producer.AddTopicOfInterest(topic);
- //
- // await producer.Start();
- // // Define your message body.
- // var bytes = Encoding.UTF8.GetBytes("foobar");
- // string tag = "yourMessageTagA";
- // // You could set multiple keys for the single message.
- // var keys = new List<string>
- // {
- // "yourMessageKey-2f00df144e48",
- // "yourMessageKey-49df1dd332b7"
- // };
- // // Set topic for current message.
- // var message = new Message(topic, bytes)
- // {
- // Tag = tag,
- // Keys = keys,
- // // Essential for DELAY message.
- // DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30)
- // };
- // var sendReceipt = await producer.Send(message);
- // Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
- // // Close the producer if you don't need it anymore.
- // await producer.Shutdown();
+ const string accessKey = "yourAccessKey";
+ const string secretKey = "yourSecretKey";
+ // Credential provider is optional for client configuration.
+ var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
+ const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+ var clientConfig = new ClientConfig(endpoints)
+ {
+ CredentialsProvider = credentialsProvider
+ };
+ // In most case, you don't need to create too many producers, single pattern is recommended.
+ var producer = new Producer(clientConfig);
+ const string topic = "yourDelayTopic";
+ // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
+ // the topic route before message publishing.
+ producer.SetTopics(topic);
+
+ await producer.Start();
+ // Define your message body.
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ const string tag = "yourMessageTagA";
+ // You could set multiple keys for the single message.
+ var keys = new List<string>
+ {
+ "yourMessageKey-2f00df144e48",
+ "yourMessageKey-49df1dd332b7"
+ };
+ // Set topic for current message.
+ var message = new Message(topic, bytes)
+ {
+ Tag = tag,
+ Keys = keys,
+ // Essential for DELAY message.
+ DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(30)
+ };
+ var sendReceipt = await producer.Send(message);
+ Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
+ // Close the producer if you don't need it anymore.
+ await producer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index 16791a13..3274ade2 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -32,6 +32,7 @@ namespace examples
{
const string accessKey = "5jFk0wK7OU6Uq395";
const string secretKey = "V1u8z19URHs4o6RQ";
+
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
@@ -41,12 +42,11 @@ namespace examples
};
// In most case, you don't need to create too many producers, single pattern is recommended.
var producer = new Producer(clientConfig);
-
+
const string topic = "lingchu_normal_topic";
producer.SetTopics(topic);
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch
// the topic route before message publishing.
-
await producer.Start();
// Define your message body.
var bytes = Encoding.UTF8.GetBytes("foobar");
@@ -67,7 +67,7 @@ namespace examples
Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
Thread.Sleep(9999999);
// Close the producer if you don't need it anymore.
- // await producer.Shutdown();
+ await producer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/examples/SimpleConsumerExample.cs b/csharp/examples/SimpleConsumerExample.cs
index 80299fbb..b41125c8 100644
--- a/csharp/examples/SimpleConsumerExample.cs
+++ b/csharp/examples/SimpleConsumerExample.cs
@@ -23,47 +23,43 @@ using Org.Apache.Rocketmq;
namespace examples
{
- static class SimpleConsumerExample
+ internal static class SimpleConsumerExample
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
internal static async Task QuickStart()
{
- // string accessKey = "yourAccessKey";
- // string secretKey = "yourSecretKey";
- // // Credential provider is optional for client configuration.
- // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- // string endpoints = "foobar.com:8080";
- //
- // string consumerGroup = "yourConsumerGroup";
- // SimpleConsumer simpleConsumer = new SimpleConsumer(endpoints, consumerGroup)
- // {
- // CredentialsProvider = credentialsProvider
- // };
- //
- // string topic = "yourTopic";
- // string tag = "tagA";
- // // Set topic subscription for consumer.
- // simpleConsumer.Subscribe(topic, new FilterExpression(tag, ExpressionType.Tag));
- // await simpleConsumer.Start();
- //
- // int maxMessageNum = 16;
- // TimeSpan invisibleDuration = TimeSpan.FromSeconds(15);
- // var messages = await simpleConsumer.Receive(maxMessageNum, invisibleDuration);
- // Logger.Info($"{messages.Count} messages has been received.");
- //
- // var tasks = new List<Task>();
- // foreach (var message in messages)
- // {
- // Logger.Info($"Received a message, topic={message.Topic}, message-id={message.MessageId}.");
- // var task = simpleConsumer.Ack(message);
- // tasks.Add(task);
- // }
- //
- // await Task.WhenAll(tasks);
- // Logger.Info($"{tasks.Count} messages have been acknowledged.");
- // // Close the consumer if you don't need it anymore.
- // await simpleConsumer.Shutdown();
+ const string accessKey = "yourAccessKey";
+ const string secretKey = "yourSecretKey";
+
+ // Credential provider is optional for client configuration.
+ var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
+ const string endpoints = "foobar.com:8080";
+ var clientConfig = new ClientConfig(endpoints)
+ {
+ CredentialsProvider = credentialsProvider
+ };
+ // Add your subscriptions.
+ const string consumerGroup = "yourConsumerGroup";
+ var subscription = new Dictionary<string, FilterExpression>
+ { { consumerGroup, new FilterExpression("*") } };
+ // In most case, you don't need to create too many consumers, single pattern is recommended.
+ var simpleConsumer =
+ new SimpleConsumer(clientConfig, consumerGroup, TimeSpan.FromSeconds(15), subscription);
+
+ await simpleConsumer.Start();
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ Logger.Info($"Received a message, topic={message.Topic}, message-id={message.MessageId}");
+ await simpleConsumer.Ack(message);
+ Logger.Info($"Message is acknowledged successfully, message-id={message.MessageId}");
+ // await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(15));
+ // Logger.Info($"Changing message invisible duration successfully, message=id={message.MessageId}");
+ }
+
+ // Close the consumer if you don't need it anymore.
+ await simpleConsumer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Address.cs b/csharp/rocketmq-client-csharp/Address.cs
index fca83530..a66a6369 100644
--- a/csharp/rocketmq-client-csharp/Address.cs
+++ b/csharp/rocketmq-client-csharp/Address.cs
@@ -1,11 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class Address : IEquatable<Address>
{
- public Address(rmq.Address address)
+ public Address(Proto.Address address)
{
Host = address.Host;
Port = address.Port;
@@ -21,9 +38,9 @@ namespace Org.Apache.Rocketmq
public int Port { get; }
- public rmq.Address ToProtobuf()
+ public Proto.Address ToProtobuf()
{
- return new rmq.Address
+ return new Proto.Address
{
Host = Host,
Port = Port
diff --git a/csharp/rocketmq-client-csharp/AddressScheme.cs b/csharp/rocketmq-client-csharp/AddressScheme.cs
index 6f36f546..06a5ea32 100644
--- a/csharp/rocketmq-client-csharp/AddressScheme.cs
+++ b/csharp/rocketmq-client-csharp/AddressScheme.cs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -28,17 +28,17 @@ namespace Org.Apache.Rocketmq
public static class AddressSchemeHelper
{
- public static rmq.AddressScheme ToProtobuf(AddressScheme scheme)
+ public static Proto.AddressScheme ToProtobuf(AddressScheme scheme)
{
switch (scheme)
{
case AddressScheme.Ipv4:
- return rmq.AddressScheme.Ipv4;
+ return Proto.AddressScheme.Ipv4;
case AddressScheme.Ipv6:
- return rmq.AddressScheme.Ipv6;
+ return Proto.AddressScheme.Ipv6;
case AddressScheme.DomainName:
default:
- return rmq.AddressScheme.DomainName;
+ return Proto.AddressScheme.DomainName;
}
}
}
diff --git a/csharp/rocketmq-client-csharp/Broker.cs b/csharp/rocketmq-client-csharp/Broker.cs
index 6b426a5a..6a2f957a 100644
--- a/csharp/rocketmq-client-csharp/Broker.cs
+++ b/csharp/rocketmq-client-csharp/Broker.cs
@@ -31,5 +31,15 @@ namespace Org.Apache.Rocketmq
public string Name { get; }
public int Id { get; }
public Endpoints Endpoints { get; }
+
+ public Proto.Broker ToProtobuf()
+ {
+ return new Proto.Broker()
+ {
+ Name = Name,
+ Id = Id,
+ Endpoints = Endpoints.ToProtobuf()
+ };
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 51ecb936..3b260022 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -44,8 +44,8 @@ namespace Org.Apache.Rocketmq
protected readonly IClientManager ClientManager;
protected readonly string ClientId;
- protected readonly ConcurrentDictionary<string, bool> Topics;
-
+ protected readonly ICollection<string> Topics;
+
protected readonly ConcurrentDictionary<Endpoints, bool> Isolated;
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache;
private readonly CancellationTokenSource _telemetryCts;
@@ -53,7 +53,7 @@ namespace Org.Apache.Rocketmq
private readonly Dictionary<Endpoints, Session> _sessionsTable;
private readonly ReaderWriterLockSlim _sessionLock;
- protected Client(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics)
+ protected Client(ClientConfig clientConfig, ICollection<string> topics)
{
ClientConfig = clientConfig;
Topics = topics;
@@ -78,7 +78,7 @@ namespace Org.Apache.Rocketmq
ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, _topicRouteUpdateCtx.Token);
ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, _heartbeatCts.Token);
ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, _settingsSyncCtx.Token);
- foreach (var topic in Topics.Keys)
+ foreach (var topic in Topics)
{
await FetchTopicRoute(topic);
}
@@ -135,7 +135,7 @@ namespace Org.Apache.Rocketmq
protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
- protected abstract void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData);
+ protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData);
private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
@@ -159,7 +159,7 @@ namespace Org.Apache.Rocketmq
}
_topicRouteCache[topic] = topicRouteData;
- OnTopicRouteDataFetched0(topic, topicRouteData);
+ OnTopicRouteDataUpdated0(topic, topicRouteData);
}
@@ -183,7 +183,7 @@ namespace Org.Apache.Rocketmq
private async void UpdateTopicRouteCache()
{
Logger.Info($"Start to update topic route cache for a new round, clientId={ClientId}");
- foreach (var topic in Topics.Keys)
+ foreach (var topic in Topics)
{
var topicRouteData = await FetchTopicRoute(topic);
_topicRouteCache[topic] = topicRouteData;
@@ -250,6 +250,7 @@ namespace Org.Apache.Rocketmq
Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " +
$"statusMessage={response.Status.Message}");
}
+
StatusChecker.Check(response.Status, request);
var messageQueues = response.MessageQueues.ToList();
@@ -266,13 +267,14 @@ namespace Org.Apache.Rocketmq
}
var request = WrapHeartbeatRequest();
- Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new ();
+ Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new();
// Collect task into a map.
foreach (var item in endpoints)
{
var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
- responses[item]= task;
+ responses[item] = task;
}
+
foreach (var item in responses.Keys)
{
var response = await responses[item];
@@ -291,11 +293,10 @@ namespace Org.Apache.Rocketmq
}
var statusMessage = response.Status.Message;
- Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
+ Logger.Info(
+ $"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
}
}
-
-
public grpc.Metadata Sign()
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index 3eef2fe6..967a2ac1 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
using System;
using System.Threading;
using System.Threading.Tasks;
@@ -78,7 +78,7 @@ namespace Org.Apache.Rocketmq
_clientLock.EnterReadLock();
try
{
- List<Task> tasks = new List<Task>();
+ var tasks = new List<Task>();
foreach (var item in _rpcClients)
{
tasks.Add(item.Value.Shutdown());
@@ -92,56 +92,57 @@ namespace Org.Apache.Rocketmq
}
}
- public grpc::AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(
+ public grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> Telemetry(
Endpoints endpoints)
{
return GetRpcClient(endpoints).Telemetry(_client.Sign());
}
- public async Task<rmq.QueryRouteResponse> QueryRoute(Endpoints endpoints, rmq.QueryRouteRequest request,
+ public async Task<Proto.QueryRouteResponse> QueryRoute(Endpoints endpoints, Proto.QueryRouteRequest request,
TimeSpan timeout)
{
return await GetRpcClient(endpoints).QueryRoute(_client.Sign(), request, timeout);
}
- public async Task<rmq.HeartbeatResponse> Heartbeat(Endpoints endpoints, rmq.HeartbeatRequest request,
+ public async Task<Proto.HeartbeatResponse> Heartbeat(Endpoints endpoints, Proto.HeartbeatRequest request,
TimeSpan timeout)
{
return await GetRpcClient(endpoints).Heartbeat(_client.Sign(), request, timeout);
}
- public async Task<rmq.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
- rmq.NotifyClientTerminationRequest request, TimeSpan timeout)
+ public async Task<Proto.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
+ Proto.NotifyClientTerminationRequest request, TimeSpan timeout)
{
return await GetRpcClient(endpoints).NotifyClientTermination(_client.Sign(), request, timeout);
}
- public async Task<rmq::SendMessageResponse> SendMessage(Endpoints endpoints, rmq::SendMessageRequest request,
+ public async Task<Proto::SendMessageResponse> SendMessage(Endpoints endpoints,
+ Proto::SendMessageRequest request,
TimeSpan timeout)
{
return await GetRpcClient(endpoints).SendMessage(_client.Sign(), request, timeout);
}
- public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints,
- rmq.QueryAssignmentRequest request, TimeSpan timeout)
+ public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints,
+ Proto.QueryAssignmentRequest request, TimeSpan timeout)
{
return await GetRpcClient(endpoints).QueryAssignment(_client.Sign(), request, timeout);
}
- public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints,
- rmq.ReceiveMessageRequest request, TimeSpan timeout)
+ public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints,
+ Proto.ReceiveMessageRequest request, TimeSpan timeout)
{
return await GetRpcClient(endpoints).ReceiveMessage(_client.Sign(), request, timeout);
}
- public async Task<rmq::AckMessageResponse> AckMessage(Endpoints endpoints,
- rmq.AckMessageRequest request, TimeSpan timeout)
+ public async Task<Proto::AckMessageResponse> AckMessage(Endpoints endpoints,
+ Proto.AckMessageRequest request, TimeSpan timeout)
{
return await GetRpcClient(endpoints).AckMessage(_client.Sign(), request, timeout);
}
- public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
- rmq.ChangeInvisibleDurationRequest request, TimeSpan timeout)
+ public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
+ Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout)
{
return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout);
}
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs
new file mode 100644
index 00000000..25df84d3
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ public abstract class Consumer : Client
+ {
+ protected readonly string ConsumerGroup;
+
+ protected Consumer(ClientConfig clientConfig, string consumerGroup, ICollection<string> topics) : base(
+ clientConfig, topics)
+ {
+ ConsumerGroup = consumerGroup;
+ }
+
+ protected async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
+ TimeSpan awaitDuration)
+ {
+ var tolerance = ClientConfig.RequestTimeout;
+ var timeout = tolerance.Add(awaitDuration);
+ var response = await ClientManager.ReceiveMessage(mq.Broker.Endpoints, request, timeout);
+ var status = new Proto.Status()
+ {
+ Code = Proto.Code.InternalServerError,
+ Message = "Status was not set by server"
+ };
+ var messageList = new List<Proto.Message>();
+ foreach (var entry in response)
+ {
+ switch (entry.ContentCase)
+ {
+ case Proto.ReceiveMessageResponse.ContentOneofCase.Status:
+ status = entry.Status;
+ break;
+ case Proto.ReceiveMessageResponse.ContentOneofCase.Message:
+ messageList.Add(entry.Message);
+ break;
+ case Proto.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp:
+ case Proto.ReceiveMessageResponse.ContentOneofCase.None:
+ default:
+ break;
+ }
+ }
+
+ var messages = messageList.Select(message => MessageView.FromProtobuf(message, mq)).ToList();
+ StatusChecker.Check(status, request);
+ return new ReceiveMessageResult(mq.Broker.Endpoints, messages);
+ }
+
+ private static Proto.FilterExpression WrapFilterExpression(FilterExpression filterExpression)
+ {
+ var filterType = Proto.FilterType.Tag;
+ if (ExpressionType.Sql92.Equals(filterExpression.Type))
+ {
+ filterType = Proto.FilterType.Sql;
+ }
+
+ return new Proto.FilterExpression
+ {
+ Type = filterType,
+ Expression = filterExpression.Expression
+ };
+ }
+
+ protected static Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
+ FilterExpression filterExpression, TimeSpan invisibleDuration)
+ {
+ return new Proto.ReceiveMessageRequest()
+ {
+ MessageQueue = mq.ToProtobuf(),
+ FilterExpression = WrapFilterExpression(filterExpression),
+ BatchSize = batchSize,
+ AutoRenew = false,
+ InvisibleDuration = Duration.FromTimeSpan(invisibleDuration)
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Error/StatusChecker.cs b/csharp/rocketmq-client-csharp/Error/StatusChecker.cs
deleted file mode 100644
index 55f56d57..00000000
--- a/csharp/rocketmq-client-csharp/Error/StatusChecker.cs
+++ /dev/null
@@ -1,41 +0,0 @@
-using Apache.Rocketmq.V2;
-using Google.Protobuf;
-
-namespace Org.Apache.Rocketmq.Error
-{
- public class StatusChecker
- {
- public static void check(Status status, IMessage message)
- {
- // var code = status.Code;
- // switch (code)
- // {
- // case Code.Ok:
- // case Code.MultipleResults:
- // return;
- // case Code.BadRequest:
- // case Code.IllegalAccessPoint:
- // case Code.IllegalTopic:
- // case Code.IllegalConsumerGroup:
- // case Code.IllegalMessageTag:
- // case Code.IllegalMessageKey:
- // case Code.IllegalMessageGroup:
- // case Code.IllegalMessagePropertyKey:
- // case Code.InvalidTransactionId:
- // case Code.IllegalMessageId:
- // case Code.IllegalFilterExpression:
- // case Code.IllegalInvisibleTime:
- // case Code.IllegalDeliveryTime:
- // case Code.InvalidReceiptHandle:
- // case Code.MessagePropertyConflictWithType:
- // case Code.UnrecognizedClientType:
- // case Code.MessageCorrupted:
- // case Code.ClientIdRequired:
- // case Code.IllegalPollingTime:
- // throw new BadRequestException(code)
- //
- // case ILLEGAL_POLLING_TIME:
- // }
- }
- }
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/FilterExpression.cs b/csharp/rocketmq-client-csharp/FilterExpression.cs
index 3bd432da..07a4af5a 100644
--- a/csharp/rocketmq-client-csharp/FilterExpression.cs
+++ b/csharp/rocketmq-client-csharp/FilterExpression.cs
@@ -25,6 +25,12 @@ namespace Org.Apache.Rocketmq
Type = type;
}
+ public FilterExpression(string expression)
+ {
+ Expression = expression;
+ Type = ExpressionType.Tag;
+ }
+
public ExpressionType Type { get; }
public string Expression { get; }
}
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index f2e48e36..df2035ab 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -25,15 +25,48 @@ namespace Org.Apache.Rocketmq
{
public interface IClientManager
{
+ /// <summary>
+ /// Establish a telemetry channel between client and remote endpoints.
+ /// </summary>
+ /// <param name="endpoints">The target endpoints.</param>
+ /// <returns>gRPC bi-directional stream.</returns>
AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Endpoints endpoints);
+ /// <summary>
+ /// Query topic route info from remote.
+ /// </summary>
+ /// <param name="endpoints">The target endpoints.</param>
+ /// <param name="request">gRPC request of querying topic route.</param>
+ /// <param name="timeout">Request max duration.</param>
+ /// <returns>Task of response.</returns>
Task<QueryRouteResponse> QueryRoute(Endpoints endpoints, QueryRouteRequest request, TimeSpan timeout);
+ /// <summary>
+ /// Send heartbeat to remote endpoints.
+ /// </summary>
+ /// <param name="endpoints">The target endpoints.</param>
+ /// <param name="request">gRPC request of heartbeat.</param>
+ /// <param name="timeout">Request max duration.</param>
+ /// <returns>Task of response.</returns>
Task<HeartbeatResponse> Heartbeat(Endpoints endpoints, HeartbeatRequest request, TimeSpan timeout);
+ /// <summary>
+ /// Notify client's termination.
+ /// </summary>
+ /// <param name="endpoints">The target endpoints.</param>
+ /// <param name="request">gRPC request of notifying client's termination.</param>
+ /// <param name="timeout">Request max duration.</param>
+ /// <returns>Task of response.</returns>
Task<NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
NotifyClientTerminationRequest request, TimeSpan timeout);
+ /// <summary>
+ /// Send message to remote endpoints.
+ /// </summary>
+ /// <param name="endpoints"></param>
+ /// <param name="request"></param>
+ /// <param name="timeout"></param>
+ /// <returns></returns>
Task<SendMessageResponse> SendMessage(Endpoints endpoints, SendMessageRequest request,
TimeSpan timeout);
diff --git a/csharp/rocketmq-client-csharp/MessageQueue.cs b/csharp/rocketmq-client-csharp/MessageQueue.cs
index cd6f0ce3..580fbafe 100644
--- a/csharp/rocketmq-client-csharp/MessageQueue.cs
+++ b/csharp/rocketmq-client-csharp/MessageQueue.cs
@@ -16,23 +16,20 @@
*/
using System.Collections.Generic;
-using rmq = Apache.Rocketmq.V2;
+using System.Linq;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class MessageQueue
{
- public MessageQueue(rmq::MessageQueue messageQueue)
+ public MessageQueue(Proto::MessageQueue messageQueue)
{
TopicResource = new Resource(messageQueue.Topic);
QueueId = messageQueue.Id;
Permission = PermissionHelper.FromProtobuf(messageQueue.Permission);
- var messageTypes = new List<MessageType>();
- foreach (var acceptMessageType in messageQueue.AcceptMessageTypes)
- {
- var messageType = MessageTypeHelper.FromProtobuf(acceptMessageType);
- messageTypes.Add(messageType);
- }
+ var messageTypes = messageQueue.AcceptMessageTypes
+ .Select(MessageTypeHelper.FromProtobuf).ToList();
AcceptMessageTypes = messageTypes;
Broker = new Broker(messageQueue.Broker);
@@ -57,5 +54,19 @@ namespace Org.Apache.Rocketmq
{
return $"{Broker.Name}.{TopicResource}.{QueueId}";
}
+
+ public Proto.MessageQueue ToProtobuf()
+ {
+ var messageTypes = AcceptMessageTypes.Select(MessageTypeHelper.ToProtobuf).ToList();
+
+ return new Proto.MessageQueue
+ {
+ Topic = TopicResource.ToProtobuf(),
+ Id = QueueId,
+ Permission = PermissionHelper.ToProtobuf(Permission),
+ Broker = Broker.ToProtobuf(),
+ AcceptMessageTypes = { messageTypes }
+ };
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index b790fcb9..dfb45e45 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Security.Cryptography;
using NLog;
@@ -30,14 +31,14 @@ namespace Org.Apache.Rocketmq
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- private readonly rmq.MessageQueue _messageQueue;
- private readonly string _receiptHandle;
+ internal readonly MessageQueue MessageQueue;
+ internal readonly string ReceiptHandle;
private readonly long _offset;
private readonly bool _corrupted;
internal MessageView(string messageId, string topic, byte[] body, string tag, string messageGroup,
DateTime deliveryTime, List<string> keys, Dictionary<string, string> properties, string bornHost,
- DateTime bornTime, int deliveryAttempt, rmq.MessageQueue messageQueue, string receiptHandle, long offset,
+ DateTime bornTime, int deliveryAttempt, MessageQueue messageQueue, string receiptHandle, long offset,
bool corrupted)
{
MessageId = messageId;
@@ -51,8 +52,8 @@ namespace Org.Apache.Rocketmq
BornHost = bornHost;
BornTime = bornTime;
DeliveryAttempt = deliveryAttempt;
- _messageQueue = messageQueue;
- _receiptHandle = receiptHandle;
+ MessageQueue = messageQueue;
+ ReceiptHandle = receiptHandle;
_offset = offset;
_corrupted = corrupted;
}
@@ -79,7 +80,7 @@ namespace Org.Apache.Rocketmq
public int DeliveryAttempt { get; }
- public static MessageView FromProtobuf(rmq.Message message, rmq.MessageQueue messageQueue)
+ public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue)
{
var topic = message.Topic.Name;
var systemProperties = message.SystemProperties;
@@ -87,11 +88,11 @@ namespace Org.Apache.Rocketmq
var bodyDigest = systemProperties.BodyDigest;
var checkSum = bodyDigest.Checksum;
var raw = message.Body.ToByteArray();
- bool corrupted = false;
+ var corrupted = false;
var type = bodyDigest.Type;
switch (type)
{
- case rmq.DigestType.Crc32:
+ case Proto.DigestType.Crc32:
{
var expectedCheckSum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length).ToString("X");
if (!expectedCheckSum.Equals(checkSum))
@@ -101,7 +102,7 @@ namespace Org.Apache.Rocketmq
break;
}
- case rmq.DigestType.Md5:
+ case Proto.DigestType.Md5:
{
var expectedCheckSum = Convert.ToHexString(MD5.HashData(raw));
if (!expectedCheckSum.Equals(checkSum))
@@ -111,7 +112,7 @@ namespace Org.Apache.Rocketmq
break;
}
- case rmq.DigestType.Sha1:
+ case Proto.DigestType.Sha1:
{
var expectedCheckSum = Convert.ToHexString(SHA1.HashData(raw));
if (!expectedCheckSum.Equals(checkSum))
@@ -121,6 +122,7 @@ namespace Org.Apache.Rocketmq
break;
}
+ case Proto.DigestType.Unspecified:
default:
{
Logger.Error(
@@ -131,18 +133,19 @@ namespace Org.Apache.Rocketmq
}
var bodyEncoding = systemProperties.BodyEncoding;
- byte[] body = raw;
+ var body = raw;
switch (bodyEncoding)
{
- case rmq.Encoding.Gzip:
+ case Proto.Encoding.Gzip:
{
body = Utilities.DecompressBytesGzip(message.Body.ToByteArray());
break;
}
- case rmq.Encoding.Identity:
+ case Proto.Encoding.Identity:
{
break;
}
+ case Proto.Encoding.Unspecified:
default:
{
Logger.Error($"Unsupported message encoding algorithm," +
@@ -151,25 +154,22 @@ namespace Org.Apache.Rocketmq
}
}
- string tag = systemProperties.HasTag ? systemProperties.Tag : null;
- string messageGroup = systemProperties.HasMessageGroup ? systemProperties.MessageGroup : null;
+ var tag = systemProperties.HasTag ? systemProperties.Tag : null;
+ var messageGroup = systemProperties.HasMessageGroup ? systemProperties.MessageGroup : null;
var deliveryTime = systemProperties.DeliveryTimestamp.ToDateTime();
- List<string> keys = new List<string>();
- foreach (var key in systemProperties.Keys)
- {
- keys.Add(key);
- }
+ var keys = systemProperties.Keys.ToList();
var bornHost = systemProperties.BornHost;
var bornTime = systemProperties.BornTimestamp.ToDateTime();
var deliveryAttempt = systemProperties.DeliveryAttempt;
var queueOffset = systemProperties.QueueOffset;
- Dictionary<string, string> properties = new Dictionary<string, string>();
+ var properties = new Dictionary<string, string>();
foreach (var (key, value) in message.UserProperties)
{
properties.Add(key, value);
}
+
var receiptHandle = systemProperties.ReceiptHandle;
return new MessageView(messageId, topic, body, tag, messageGroup, deliveryTime, keys, properties, bornHost,
bornTime, deliveryAttempt, messageQueue, receiptHandle, queueOffset, corrupted);
diff --git a/csharp/rocketmq-client-csharp/MqEncoding.cs b/csharp/rocketmq-client-csharp/MqEncoding.cs
index 27dfb052..ce6f4779 100644
--- a/csharp/rocketmq-client-csharp/MqEncoding.cs
+++ b/csharp/rocketmq-client-csharp/MqEncoding.cs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -27,13 +27,13 @@ namespace Org.Apache.Rocketmq
public static class EncodingHelper
{
- public static rmq.Encoding ToProtobuf(MqEncoding mqEncoding)
+ public static Proto.Encoding ToProtobuf(MqEncoding mqEncoding)
{
return mqEncoding switch
{
- MqEncoding.Gzip => rmq.Encoding.Gzip,
- MqEncoding.Identity => rmq.Encoding.Identity,
- _ => rmq.Encoding.Unspecified
+ MqEncoding.Gzip => Proto.Encoding.Gzip,
+ MqEncoding.Identity => Proto.Encoding.Identity,
+ _ => Proto.Encoding.Unspecified
};
}
}
diff --git a/csharp/rocketmq-client-csharp/Permission.cs b/csharp/rocketmq-client-csharp/Permission.cs
index eedd6247..20d2828d 100644
--- a/csharp/rocketmq-client-csharp/Permission.cs
+++ b/csharp/rocketmq-client-csharp/Permission.cs
@@ -46,8 +46,24 @@ namespace Org.Apache.Rocketmq
throw new InternalErrorException("Permission is not specified");
}
}
-
- public static bool IsWritable(Permission permission) {
+
+ public static Proto.Permission ToProtobuf(Permission permission)
+ {
+ switch (permission)
+ {
+ case Permission.Read:
+ return Proto.Permission.Read;
+ case Permission.Write:
+ return Proto.Permission.Write;
+ case Permission.ReadWrite:
+ return Proto.Permission.ReadWrite;
+ default:
+ throw new InternalErrorException("Permission is not specified");
+ }
+ }
+
+ public static bool IsWritable(Permission permission)
+ {
switch (permission)
{
case Permission.Write:
@@ -74,6 +90,4 @@ namespace Org.Apache.Rocketmq
}
}
}
-
-
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index fc41926b..62387a98 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -25,7 +25,7 @@ using NLog;
namespace Org.Apache.Rocketmq
{
- public class Producer : Client, IProducer
+ public class Producer : Client
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
@@ -42,7 +42,7 @@ namespace Org.Apache.Rocketmq
}
private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
- base(clientConfig, topics)
+ base(clientConfig, topics.Keys)
{
var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
_publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
@@ -54,7 +54,7 @@ namespace Org.Apache.Rocketmq
{
foreach (var topic in topics)
{
- Topics[topic] = false;
+ Topics.Add(topic);
}
}
@@ -94,7 +94,7 @@ namespace Org.Apache.Rocketmq
return publishingLoadBalancer;
}
- protected override void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData)
+ protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
{
var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
_publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer);
diff --git a/csharp/rocketmq-client-csharp/Publishing.cs b/csharp/rocketmq-client-csharp/Publishing.cs
index ffedd177..055e36ce 100644
--- a/csharp/rocketmq-client-csharp/Publishing.cs
+++ b/csharp/rocketmq-client-csharp/Publishing.cs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
using System.Collections.Generic;
namespace Org.Apache.Rocketmq
@@ -23,7 +23,7 @@ namespace Org.Apache.Rocketmq
// Settings for publishing
public class Publishing
{
- public List<rmq::Resource> Topics { get; set; }
+ public List<Proto::Resource> Topics { get; set; }
public int CompressBodyThreshold { get; set; }
public int MaxBodySize { get; set; }
diff --git a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
index c33bc7dc..2133af5a 100644
--- a/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/PublishingLoadBalancer.cs
@@ -18,23 +18,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class PublishingLoadBalancer
{
private readonly List<MessageQueue> _messageQueues;
+
+ // TODO
private int _roundRobinIndex;
public PublishingLoadBalancer(TopicRouteData route)
{
_messageQueues = new List<MessageQueue>();
- foreach (var messageQueue in route.MessageQueues.Where(messageQueue =>
+ foreach (var mq in route.MessageQueues.Where(messageQueue =>
PermissionHelper.IsWritable(messageQueue.Permission) &&
Utilities.MasterBrokerId == messageQueue.Broker.Id))
{
- _messageQueues.Add(messageQueue);
+ _messageQueues.Add(mq);
}
var random = new Random();
@@ -70,21 +71,23 @@ namespace Org.Apache.Rocketmq
}
}
- if (candidates.Count != 0) return candidates;
+ if (candidates.Count != 0)
+ {
+ return candidates;
+ }
+
+ foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count))
+ .Select(positiveMod => _messageQueues[positiveMod]))
{
- foreach (var mq in _messageQueues.Select(_ => Utilities.GetPositiveMod(next++, _messageQueues.Count))
- .Select(positiveMod => _messageQueues[positiveMod]))
+ if (!candidateBrokerNames.Contains(mq.Broker.Name))
{
- if (!candidateBrokerNames.Contains(mq.Broker.Name))
- {
- candidateBrokerNames.Add(mq.Broker.Name);
- candidates.Add(mq);
- }
+ candidateBrokerNames.Add(mq.Broker.Name);
+ candidates.Add(mq);
+ }
- if (candidates.Count >= count)
- {
- break;
- }
+ if (candidates.Count >= count)
+ {
+ break;
}
}
diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs
index 023b0be3..b9f8f454 100644
--- a/csharp/rocketmq-client-csharp/PublishingSettings.cs
+++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs
@@ -17,7 +17,7 @@
using System;
using System.Collections.Concurrent;
-using System.Collections.Generic;
+using System.Linq;
using Google.Protobuf.WellKnownTypes;
using Proto = Apache.Rocketmq.V2;
@@ -28,8 +28,8 @@ namespace Org.Apache.Rocketmq
private volatile int _maxBodySizeBytes = 4 * 1024 * 1024;
private volatile bool _validateMessageType = true;
- public PublishingSettings(string clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy,
- TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, accessPoint,
+ public PublishingSettings(string clientId, Endpoints endpoints, ExponentialBackoffRetryPolicy retryPolicy,
+ TimeSpan requestTimeout, ConcurrentDictionary<string, bool> topics) : base(clientId, ClientType.Producer, endpoints,
retryPolicy, requestTimeout)
{
Topics = topics;
@@ -54,14 +54,7 @@ namespace Org.Apache.Rocketmq
public override Proto.Settings ToProtobuf()
{
- List<Proto.Resource> topics = new List<Proto.Resource>();
- foreach (var topic in Topics)
- {
- topics.Add(new Proto.Resource
- {
- Name = topic.Key
- });
- }
+ var topics = Topics.Select(topic => new Proto.Resource { Name = topic.Key }).ToList();
var publishing = new Proto.Publishing();
publishing.Topics.Add(topics);
@@ -69,7 +62,7 @@ namespace Org.Apache.Rocketmq
return new Proto.Settings
{
Publishing = publishing,
- AccessPoint = AccessPoint.ToProtobuf(),
+ AccessPoint = Endpoints.ToProtobuf(),
ClientType = ClientTypeHelper.ToProtobuf(ClientType),
RequestTimeout = Duration.FromTimeSpan(RequestTimeout),
BackoffPolicy = RetryPolicy.ToProtobuf(),
diff --git a/csharp/rocketmq-client-csharp/IProducer.cs b/csharp/rocketmq-client-csharp/ReceiveMessageResult.cs
similarity index 71%
rename from csharp/rocketmq-client-csharp/IProducer.cs
rename to csharp/rocketmq-client-csharp/ReceiveMessageResult.cs
index 420af202..1898838b 100644
--- a/csharp/rocketmq-client-csharp/IProducer.cs
+++ b/csharp/rocketmq-client-csharp/ReceiveMessageResult.cs
@@ -15,16 +15,20 @@
* limitations under the License.
*/
-using System.Threading.Tasks;
+using System.Collections.Generic;
namespace Org.Apache.Rocketmq
{
- public interface IProducer
+ public class ReceiveMessageResult
{
- Task Start();
+ public ReceiveMessageResult(Endpoints endpoints, List<MessageView> messages)
+ {
+ Endpoints = endpoints;
+ Messages = messages;
+ }
- Task Shutdown();
+ public Endpoints Endpoints { get; }
- Task<SendReceipt> Send(Message message);
+ public List<MessageView> Messages { get; }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Resource.cs b/csharp/rocketmq-client-csharp/Resource.cs
index a1825f15..76c0ba9e 100644
--- a/csharp/rocketmq-client-csharp/Resource.cs
+++ b/csharp/rocketmq-client-csharp/Resource.cs
@@ -11,6 +11,12 @@ namespace Org.Apache.Rocketmq
Name = resource.Name;
}
+ public Resource(string name)
+ {
+ Namespace = "";
+ Name = name;
+ }
+
public string Namespace { get; }
public string Name { get; }
@@ -22,7 +28,7 @@ namespace Org.Apache.Rocketmq
Name = Name
};
}
-
+
public override string ToString()
{
return String.IsNullOrEmpty(Namespace) ? Name : $"{Namespace}.{Name}";
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs
index de28c184..bf45410c 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -21,7 +21,7 @@ using System.Net.Http;
using System.Net.Security;
using System.Threading;
using System.Threading.Tasks;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
using Grpc.Core;
using Grpc.Core.Interceptors;
using Grpc.Net.Client;
@@ -32,7 +32,7 @@ namespace Org.Apache.Rocketmq
public class RpcClient : IRpcClient
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- private readonly rmq::MessagingService.MessagingServiceClient _stub;
+ private readonly Proto::MessagingService.MessagingServiceClient _stub;
private readonly GrpcChannel _channel;
private readonly string _target;
@@ -44,7 +44,7 @@ namespace Org.Apache.Rocketmq
HttpHandler = CreateHttpHandler()
});
var invoker = _channel.Intercept(new ClientLoggerInterceptor());
- _stub = new rmq::MessagingService.MessagingServiceClient(invoker);
+ _stub = new Proto::MessagingService.MessagingServiceClient(invoker);
}
public async Task Shutdown()
@@ -74,14 +74,14 @@ namespace Org.Apache.Rocketmq
return handler;
}
- public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
+ public AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> Telemetry(Metadata metadata)
{
var deadline = DateTime.UtcNow.Add(TimeSpan.FromDays(3650));
var callOptions = new CallOptions(metadata, deadline);
return _stub.Telemetry(callOptions);
}
- public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request,
+ public async Task<Proto::QueryRouteResponse> QueryRoute(Metadata metadata, Proto::QueryRouteRequest request,
TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
@@ -92,7 +92,7 @@ namespace Org.Apache.Rocketmq
}
- public async Task<rmq::HeartbeatResponse> Heartbeat(Metadata metadata, rmq::HeartbeatRequest request,
+ public async Task<Proto::HeartbeatResponse> Heartbeat(Metadata metadata, Proto::HeartbeatRequest request,
TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
@@ -102,7 +102,7 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<rmq::SendMessageResponse> SendMessage(Metadata metadata, rmq::SendMessageRequest request,
+ public async Task<Proto::SendMessageResponse> SendMessage(Metadata metadata, Proto::SendMessageRequest request,
TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
@@ -112,8 +112,8 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<rmq::QueryAssignmentResponse> QueryAssignment(Metadata metadata,
- rmq::QueryAssignmentRequest request,
+ public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Metadata metadata,
+ Proto::QueryAssignmentRequest request,
TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
@@ -123,14 +123,14 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata,
- rmq::ReceiveMessageRequest request, TimeSpan timeout)
+ public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata,
+ Proto::ReceiveMessageRequest request, TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);
var call = _stub.ReceiveMessage(request, callOptions);
Logger.Debug($"ReceiveMessageRequest has been written to {_target}");
- var result = new List<rmq::ReceiveMessageResponse>();
+ var result = new List<Proto::ReceiveMessageResponse>();
var stream = call.ResponseStream;
while (await stream.MoveNext())
{
@@ -143,7 +143,7 @@ namespace Org.Apache.Rocketmq
return result;
}
- public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request,
+ public async Task<Proto::AckMessageResponse> AckMessage(Metadata metadata, Proto::AckMessageRequest request,
TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
@@ -153,8 +153,8 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<rmq::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata,
- rmq::ChangeInvisibleDurationRequest request,
+ public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Metadata metadata,
+ Proto::ChangeInvisibleDurationRequest request,
TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
@@ -164,9 +164,9 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<rmq::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(
+ public async Task<Proto::ForwardMessageToDeadLetterQueueResponse> ForwardMessageToDeadLetterQueue(
Metadata metadata,
- rmq::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
+ Proto::ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);
@@ -175,8 +175,8 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<rmq::EndTransactionResponse> EndTransaction(Metadata metadata,
- rmq::EndTransactionRequest request,
+ public async Task<Proto::EndTransactionResponse> EndTransaction(Metadata metadata,
+ Proto::EndTransactionRequest request,
TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
@@ -186,8 +186,8 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<rmq::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
- rmq::NotifyClientTerminationRequest request, TimeSpan timeout)
+ public async Task<Proto::NotifyClientTerminationResponse> NotifyClientTermination(Metadata metadata,
+ Proto::NotifyClientTerminationRequest request, TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);
diff --git a/csharp/rocketmq-client-csharp/Settings.cs b/csharp/rocketmq-client-csharp/Settings.cs
index 7716fc2d..491aa564 100644
--- a/csharp/rocketmq-client-csharp/Settings.cs
+++ b/csharp/rocketmq-client-csharp/Settings.cs
@@ -24,25 +24,25 @@ namespace Org.Apache.Rocketmq
{
protected readonly string ClientId;
protected readonly ClientType ClientType;
- protected readonly Endpoints AccessPoint;
+ protected readonly Endpoints Endpoints;
protected volatile IRetryPolicy RetryPolicy;
protected readonly TimeSpan RequestTimeout;
- public Settings(string clientId, ClientType clientType, Endpoints accessPoint, IRetryPolicy retryPolicy,
+ public Settings(string clientId, ClientType clientType, Endpoints endpoints, IRetryPolicy retryPolicy,
TimeSpan requestTimeout)
{
ClientId = clientId;
ClientType = clientType;
- AccessPoint = accessPoint;
+ Endpoints = endpoints;
RetryPolicy = retryPolicy;
RequestTimeout = requestTimeout;
}
- public Settings(string clientId, ClientType clientType, Endpoints accessPoint, TimeSpan requestTimeout)
+ public Settings(string clientId, ClientType clientType, Endpoints endpoints, TimeSpan requestTimeout)
{
ClientId = clientId;
ClientType = clientType;
- AccessPoint = accessPoint;
+ Endpoints = endpoints;
RetryPolicy = null;
RequestTimeout = requestTimeout;
}
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
new file mode 100644
index 00000000..1a0f0ec2
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -0,0 +1,192 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Proto = Apache.Rocketmq.V2;
+using NLog;
+using Org.Apache.Rocketmq.Error;
+
+namespace Org.Apache.Rocketmq
+{
+ public class SimpleConsumer : Consumer
+ {
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+ private readonly ConcurrentDictionary<string /* topic */, SubscriptionLoadBalancer> _subscriptionRouteDataCache;
+ private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions;
+ private readonly TimeSpan _awaitDuration;
+ private readonly SimpleSubscriptionSettings _simpleSubscriptionSettings;
+ private int _topicRoundRobinIndex;
+
+ public SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration,
+ Dictionary<string, FilterExpression> subscriptionExpressions) : this(clientConfig, consumerGroup,
+ awaitDuration, new ConcurrentDictionary<string, FilterExpression>(subscriptionExpressions))
+ {
+ }
+
+ private SimpleConsumer(ClientConfig clientConfig, string consumerGroup, TimeSpan awaitDuration,
+ ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(clientConfig, consumerGroup,
+ subscriptionExpressions.Keys)
+ {
+ _awaitDuration = awaitDuration;
+ _subscriptionRouteDataCache = new ConcurrentDictionary<string, SubscriptionLoadBalancer>();
+ _subscriptionExpressions = subscriptionExpressions;
+ _simpleSubscriptionSettings = new SimpleSubscriptionSettings(ClientId, clientConfig.Endpoints,
+ ConsumerGroup, clientConfig.RequestTimeout, awaitDuration, subscriptionExpressions);
+ _topicRoundRobinIndex = 0;
+ }
+
+ public async Task Subscribe(string topic, FilterExpression filterExpression)
+ {
+ // TODO: check running status.
+ await GetSubscriptionLoadBalancer(topic);
+ _subscriptionExpressions.TryAdd(topic, filterExpression);
+ }
+
+ public void Unsubscribe(string topic)
+ {
+ _subscriptionExpressions.TryRemove(topic, out _);
+ }
+
+ public override async Task Start()
+ {
+ Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}");
+ await base.Start();
+ Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+ }
+
+ public override async Task Shutdown()
+ {
+ Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}");
+ await base.Shutdown();
+ Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+ }
+
+ protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
+ {
+ return new Proto::HeartbeatRequest
+ {
+ ClientType = Proto.ClientType.SimpleConsumer
+ };
+ }
+
+ protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData)
+ {
+ var subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData);
+ _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
+ }
+
+ public override Proto.Settings GetSettings()
+ {
+ return _simpleSubscriptionSettings.ToProtobuf();
+ }
+
+ public override void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
+ {
+ _simpleSubscriptionSettings.Sync(settings);
+ }
+
+
+ private async Task<SubscriptionLoadBalancer> GetSubscriptionLoadBalancer(string topic)
+ {
+ if (_subscriptionRouteDataCache.TryGetValue(topic, out var subscriptionLoadBalancer))
+ {
+ return subscriptionLoadBalancer;
+ }
+
+ var topicRouteData = await FetchTopicRoute(topic);
+ subscriptionLoadBalancer = new SubscriptionLoadBalancer(topicRouteData);
+ _subscriptionRouteDataCache.TryAdd(topic, subscriptionLoadBalancer);
+
+ return subscriptionLoadBalancer;
+ }
+
+
+ public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration)
+ {
+ if (maxMessageNum <= 0)
+ {
+ throw new InternalErrorException("maxMessageNum must be greater than 0");
+ }
+
+ var copy = new ConcurrentDictionary<string, FilterExpression>(_subscriptionExpressions);
+ var topics = new List<string>(copy.Keys);
+ if (topics.Count <= 0)
+ {
+ throw new ArgumentException("There is no topic to receive message");
+ }
+
+ var index = Utilities.GetPositiveMod(Interlocked.Increment(ref _topicRoundRobinIndex), topics.Count);
+ var topic = topics[index];
+ var filterExpression = _subscriptionExpressions[topic];
+ var subscriptionLoadBalancer = await GetSubscriptionLoadBalancer(topic);
+
+ var mq = subscriptionLoadBalancer.TakeMessageQueue();
+ var request = WrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration);
+ var receiveMessageResult = await ReceiveMessage(request, mq, _awaitDuration);
+ return receiveMessageResult.Messages;
+ }
+
+ public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
+ {
+ var request = WrapChangeInvisibleDuration(messageView, invisibleDuration);
+ var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
+ request, ClientConfig.RequestTimeout);
+ StatusChecker.Check(response.Status, request);
+ }
+
+
+ public async Task Ack(MessageView messageView)
+ {
+ var request = WrapAckMessageRequest(messageView);
+ var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+ ClientConfig.RequestTimeout);
+ StatusChecker.Check(response.Status, request);
+ }
+
+ private Proto.AckMessageRequest WrapAckMessageRequest(MessageView messageView)
+ {
+ var topicResource = new Proto.Resource
+ {
+ Name = messageView.Topic
+ };
+ var entry = new Proto.AckMessageEntry
+ {
+ MessageId = messageView.MessageId,
+ ReceiptHandle = messageView.ReceiptHandle,
+ };
+ return new Proto.AckMessageRequest
+ {
+ Group = GetProtobufGroup(),
+ Topic = topicResource,
+ Entries = { entry }
+ };
+ }
+
+ private Proto.ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView,
+ TimeSpan invisibleDuration)
+ {
+ var topicResource = new Proto.Resource
+ {
+ Name = messageView.Topic
+ };
+ return new Proto.ChangeInvisibleDurationRequest
+ {
+ Topic = topicResource,
+ Group = GetProtobufGroup(),
+ ReceiptHandle = messageView.ReceiptHandle,
+ InvisibleDuration = Duration.FromTimeSpan(invisibleDuration),
+ MessageId = messageView.MessageId
+ };
+ }
+
+ private Proto.Resource GetProtobufGroup()
+ {
+ return new Proto.Resource()
+ {
+ Name = ConsumerGroup
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
new file mode 100644
index 00000000..a6a409d7
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/SimpleSubscriptionSettings.cs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Google.Protobuf.WellKnownTypes;
+using NLog;
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ public class SimpleSubscriptionSettings : Settings
+ {
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+ private readonly Resource _group;
+ private readonly TimeSpan _longPollingTimeout;
+ private readonly ConcurrentDictionary<string /* topic */, FilterExpression> _subscriptionExpressions;
+
+ public SimpleSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup,
+ TimeSpan requestTimeout, TimeSpan longPollingTimeout,
+ ConcurrentDictionary<string, FilterExpression> subscriptionExpressions) : base(
+ clientId, ClientType.SimpleConsumer, endpoints, requestTimeout)
+ {
+ _group = new Resource(consumerGroup);
+ _longPollingTimeout = longPollingTimeout;
+ _subscriptionExpressions = subscriptionExpressions;
+ }
+
+ public override void Sync(Proto::Settings settings)
+ {
+ // TODO
+ }
+
+ public override Proto.Settings ToProtobuf()
+ {
+ var subscriptionEntries = new List<Proto.SubscriptionEntry>();
+ foreach (var (key, value) in _subscriptionExpressions)
+ {
+ var topic = new Proto.Resource()
+ {
+ Name = key,
+ };
+ var subscriptionEntry = new Proto.SubscriptionEntry();
+ var filterExpression = new Proto.FilterExpression();
+ switch (value.Type)
+ {
+ case ExpressionType.Tag:
+ filterExpression.Type = Proto.FilterType.Tag;
+ break;
+ case ExpressionType.Sql92:
+ filterExpression.Type = Proto.FilterType.Sql;
+ break;
+ default:
+ Logger.Warn($"[Bug] Unrecognized filter type={value.Type} for simple consumer");
+ break;
+ }
+
+ filterExpression.Expression = value.Expression;
+ subscriptionEntry.Topic = topic;
+ subscriptionEntries.Add(subscriptionEntry);
+ }
+
+ var subscription = new Proto.Subscription
+ {
+ Group = _group.ToProtobuf(),
+ Subscriptions = { subscriptionEntries },
+ LongPollingTimeout = Duration.FromTimeSpan(_longPollingTimeout)
+ };
+ return new Proto.Settings
+ {
+ AccessPoint = Endpoints.ToProtobuf(),
+ ClientType = ClientTypeHelper.ToProtobuf(ClientType),
+ RequestTimeout = Duration.FromTimeSpan(RequestTimeout),
+ Subscription = subscription,
+ UserAgent = UserAgent.Instance.ToProtobuf()
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/StatusChecker.cs b/csharp/rocketmq-client-csharp/StatusChecker.cs
index 641fd097..2802fd78 100644
--- a/csharp/rocketmq-client-csharp/StatusChecker.cs
+++ b/csharp/rocketmq-client-csharp/StatusChecker.cs
@@ -31,6 +31,7 @@ namespace Org.Apache.Rocketmq
var statusCode = status.Code;
var statusMessage = status.Message;
+ // TODO
switch (statusCode)
{
case Proto.Code.Ok:
diff --git a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
index b77da833..a8b1963e 100644
--- a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
@@ -15,37 +15,35 @@
* limitations under the License.
*/
+using System;
using System.Collections.Generic;
-using System.Threading;
-using rmq = Apache.Rocketmq.V2;
+using System.Linq;
namespace Org.Apache.Rocketmq
{
internal sealed class SubscriptionLoadBalancer
{
- public List<rmq.Assignment> Assignments { get; private set; }
- private uint index = 0;
+ private readonly List<MessageQueue> _messageQueues;
+ private int _roundRobinIndex;
- public SubscriptionLoadBalancer(List<rmq.Assignment> assignments)
+ public SubscriptionLoadBalancer(TopicRouteData topicRouteData)
{
- Assignments = assignments;
- }
+ _messageQueues = new List<MessageQueue>();
+ foreach (var mq in topicRouteData.MessageQueues.Where(mq => PermissionHelper.IsReadable(mq.Permission))
+ .Where(mq => Utilities.MasterBrokerId == mq.Broker.Id))
+ {
+ _messageQueues.Add(mq);
+ }
- private SubscriptionLoadBalancer(uint oldIndex, List<rmq.Assignment> assignments)
- {
- index = oldIndex;
- Assignments = assignments;
- }
-
- public SubscriptionLoadBalancer Update(List<rmq.Assignment> newAssignments)
- {
- return new SubscriptionLoadBalancer(index, newAssignments);
+ var random = new Random();
+ _roundRobinIndex = random.Next(0, _messageQueues.Count);
}
- public rmq.MessageQueue TakeMessageQueue()
+ public MessageQueue TakeMessageQueue()
{
- var i = Interlocked.Increment(ref index);
- return Assignments[(int)(i % Assignments.Count)].MessageQueue;
+ var next = ++_roundRobinIndex;
+ var index = Utilities.GetPositiveMod(next, _messageQueues.Count);
+ return _messageQueues[index];
}
}
-}
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs
index 2be2b9a5..885db5f6 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteData.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs
@@ -18,19 +18,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class TopicRouteData : IEquatable<TopicRouteData>
{
- public TopicRouteData(List<rmq::MessageQueue> messageQueues)
+ public TopicRouteData(IEnumerable<Proto.MessageQueue> messageQueues)
{
- var messageQueuesList = new List<MessageQueue>();
- foreach (var mq in messageQueues)
- {
- messageQueuesList.Add(new MessageQueue(mq));
- }
+ var messageQueuesList = messageQueues.Select(mq => new MessageQueue(mq)).ToList();
MessageQueues = messageQueuesList;
}
diff --git a/csharp/rocketmq-client-csharp/Utilities.cs b/csharp/rocketmq-client-csharp/Utilities.cs
index 592f364e..d032ae1e 100644
--- a/csharp/rocketmq-client-csharp/Utilities.cs
+++ b/csharp/rocketmq-client-csharp/Utilities.cs
@@ -22,7 +22,6 @@ using System;
using System.IO;
using System.IO.Compression;
using System.Threading;
-using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -81,10 +80,10 @@ namespace Org.Apache.Rocketmq
public static string ByteArrayToHexString(byte[] bytes)
{
- StringBuilder result = new StringBuilder(bytes.Length * 2);
+ var result = new StringBuilder(bytes.Length * 2);
const string hexAlphabet = "0123456789ABCDEF";
- foreach (byte b in bytes)
+ foreach (var b in bytes)
{
result.Append(hexAlphabet[(int)(b >> 4)]);
result.Append(hexAlphabet[(int)(b & 0xF)]);
diff --git a/csharp/tests/UnitTest1.cs b/csharp/tests/UnitTest1.cs
index af13d3dc..2f6b468a 100644
--- a/csharp/tests/UnitTest1.cs
+++ b/csharp/tests/UnitTest1.cs
@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.Rocketmq;
-using Grpc.Net.Client;
-using rmq = Apache.Rocketmq.V2;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Proto = Apache.Rocketmq.V2;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -31,33 +29,32 @@ namespace tests
[TestMethod]
public void TestMethod1()
{
- rmq::Permission perm = rmq::Permission.None;
+ Proto::Permission perm = Proto::Permission.None;
switch (perm)
{
- case rmq::Permission.None:
- {
- Console.WriteLine("None");
- break;
- }
-
- case rmq::Permission.Read:
- {
- Console.WriteLine("Read");
- break;
- }
+ case Proto::Permission.None:
+ {
+ Console.WriteLine("None");
+ break;
+ }
- case rmq::Permission.Write:
- {
- Console.WriteLine("Write");
- break;
- }
+ case Proto::Permission.Read:
+ {
+ Console.WriteLine("Read");
+ break;
+ }
- case rmq::Permission.ReadWrite:
- {
- Console.WriteLine("ReadWrite");
- break;
- }
+ case Proto::Permission.Write:
+ {
+ Console.WriteLine("Write");
+ break;
+ }
+ case Proto::Permission.ReadWrite:
+ {
+ Console.WriteLine("ReadWrite");
+ break;
+ }
}
}
@@ -82,4 +79,4 @@ namespace tests
Assert.AreEqual(1, list.Count);
}
}
-}
+}
\ No newline at end of file