You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:45 UTC
[rocketmq-clients] 05/28: Polish code
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 9c5d8858460e0676e302abcc3cc64733f737852b
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Feb 7 20:00:08 2023 +0800
Polish code
---
csharp/examples/ProducerNormalMessageExample.cs | 2 +-
csharp/rocketmq-client-csharp/AccessPoint.cs | 74 ------------
csharp/rocketmq-client-csharp/Address.cs | 3 +-
.../AddressListEqualityComparer.cs | 17 +++
csharp/rocketmq-client-csharp/AddressScheme.cs | 17 +++
csharp/rocketmq-client-csharp/Broker.cs | 21 +++-
csharp/rocketmq-client-csharp/Client.cs | 47 +++++---
csharp/rocketmq-client-csharp/ClientConfig.cs | 8 --
.../ClientLoggerInterceptor.cs | 1 +
csharp/rocketmq-client-csharp/ClientManager.cs | 1 -
csharp/rocketmq-client-csharp/ClientType.cs | 20 ++--
.../ConfigFileCredentialsProvider.cs | 1 +
csharp/rocketmq-client-csharp/Endpoints.cs | 34 ++++--
.../ExponentialBackoffRetryPolicy.cs | 14 +--
csharp/rocketmq-client-csharp/IClient.cs | 30 ++++-
csharp/rocketmq-client-csharp/IClientConfig.cs | 13 +-
csharp/rocketmq-client-csharp/IClientManager.cs | 6 +-
.../rocketmq-client-csharp/ICredentialsProvider.cs | 1 +
csharp/rocketmq-client-csharp/Message.cs | 75 ++----------
csharp/rocketmq-client-csharp/MessageException.cs | 29 -----
.../rocketmq-client-csharp/MessageIdGenerator.cs | 26 ++--
csharp/rocketmq-client-csharp/MessageQueue.cs | 17 +++
csharp/rocketmq-client-csharp/MessageType.cs | 34 +++---
csharp/rocketmq-client-csharp/MessageView.cs | 2 +-
csharp/rocketmq-client-csharp/MqEncoding.cs | 30 +++--
csharp/rocketmq-client-csharp/MqLogManager.cs | 1 +
csharp/rocketmq-client-csharp/Permission.cs | 34 +++---
csharp/rocketmq-client-csharp/Producer.cs | 57 ++++++---
csharp/rocketmq-client-csharp/PublishingMessage.cs | 27 ++---
.../rocketmq-client-csharp/PublishingSettings.cs | 19 ++-
csharp/rocketmq-client-csharp/Resource.cs | 8 +-
csharp/rocketmq-client-csharp/RetryPolicy.cs | 8 +-
csharp/rocketmq-client-csharp/SendReceipt.cs | 17 +--
csharp/rocketmq-client-csharp/SequenceGenerator.cs | 131 ---------------------
csharp/rocketmq-client-csharp/Session.cs | 19 +--
csharp/rocketmq-client-csharp/Settings.cs | 29 ++++-
csharp/rocketmq-client-csharp/Signature.cs | 53 ++++-----
csharp/rocketmq-client-csharp/StatusChecker.cs | 2 +-
.../SubscriptionLoadBalancer.cs | 1 -
.../rocketmq-client-csharp/TopicRouteException.cs | 1 +
csharp/tests/MessageIdGeneratorTest.cs | 4 +-
csharp/tests/SequenceGeneratorTest.cs | 49 --------
42 files changed, 395 insertions(+), 588 deletions(-)
diff --git a/csharp/examples/ProducerNormalMessageExample.cs b/csharp/examples/ProducerNormalMessageExample.cs
index 1f80671c..16791a13 100644
--- a/csharp/examples/ProducerNormalMessageExample.cs
+++ b/csharp/examples/ProducerNormalMessageExample.cs
@@ -24,7 +24,7 @@ using Org.Apache.Rocketmq;
namespace examples
{
- static class ProducerNormalMessageExample
+ internal static class ProducerNormalMessageExample
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
diff --git a/csharp/rocketmq-client-csharp/AccessPoint.cs b/csharp/rocketmq-client-csharp/AccessPoint.cs
deleted file mode 100644
index f05fa293..00000000
--- a/csharp/rocketmq-client-csharp/AccessPoint.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using rmq = Apache.Rocketmq.V2;
-using System.Net;
-using System.Net.Sockets;
-
-namespace Org.Apache.Rocketmq
-{
- public class AccessPoint
- {
- public AccessPoint()
- {
-
- }
-
- public AccessPoint(string accessUrl)
- {
- string[] segments = accessUrl.Split(":");
- if (segments.Length != 2)
- {
- throw new ArgumentException("Access url should be of format host:port");
- }
-
- Host = segments[0];
- Port = Int32.Parse(segments[1]);
- }
-
- public string Host { get; }
-
- public int Port { get; set; }
-
- public string TargetUrl()
- {
- return $"https://{Host}:{Port}";
- }
-
- public rmq::AddressScheme HostScheme()
- {
- return SchemeOf(Host);
- }
-
- private static rmq::AddressScheme SchemeOf(string host)
- {
- var result = IPAddress.TryParse(host, out var ip);
- if (!result)
- {
- return rmq::AddressScheme.DomainName;
- }
-
- return ip.AddressFamily switch
- {
- AddressFamily.InterNetwork => rmq::AddressScheme.Ipv4,
- AddressFamily.InterNetworkV6 => rmq::AddressScheme.Ipv6,
- _ => rmq::AddressScheme.Unspecified
- };
- }
- }
-}
diff --git a/csharp/rocketmq-client-csharp/Address.cs b/csharp/rocketmq-client-csharp/Address.cs
index 316323c9..fca83530 100644
--- a/csharp/rocketmq-client-csharp/Address.cs
+++ b/csharp/rocketmq-client-csharp/Address.cs
@@ -57,8 +57,7 @@ namespace Org.Apache.Rocketmq
return true;
}
- if (obj.GetType() != this.GetType()) return false;
- return Equals((Address)obj);
+ return obj.GetType() == GetType() && Equals((Address)obj);
}
public override int GetHashCode()
diff --git a/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs b/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs
index 5b793f37..b8aff27a 100644
--- a/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs
+++ b/csharp/rocketmq-client-csharp/AddressListEqualityComparer.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System.Collections.Generic;
using System.Linq;
diff --git a/csharp/rocketmq-client-csharp/AddressScheme.cs b/csharp/rocketmq-client-csharp/AddressScheme.cs
index f9c1c290..6f36f546 100644
--- a/csharp/rocketmq-client-csharp/AddressScheme.cs
+++ b/csharp/rocketmq-client-csharp/AddressScheme.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
diff --git a/csharp/rocketmq-client-csharp/Broker.cs b/csharp/rocketmq-client-csharp/Broker.cs
index 370ac96a..6b426a5a 100644
--- a/csharp/rocketmq-client-csharp/Broker.cs
+++ b/csharp/rocketmq-client-csharp/Broker.cs
@@ -1,10 +1,27 @@
-using rmq = Apache.Rocketmq.V2;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class Broker
{
- public Broker(rmq.Broker broker)
+ public Broker(Proto.Broker broker)
{
Name = broker.Name;
Id = broker.Id;
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index a1c4b821..26ff9fcc 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -41,7 +41,7 @@ namespace Org.Apache.Rocketmq
private readonly CancellationTokenSource _settingsSyncCtx;
protected readonly ClientConfig ClientConfig;
- protected readonly IClientManager Manager;
+ protected readonly IClientManager ClientManager;
protected readonly string ClientId;
protected readonly ConcurrentDictionary<string, bool> Topics;
@@ -58,7 +58,7 @@ namespace Org.Apache.Rocketmq
Topics = topics;
ClientId = Utilities.GetClientId();
- Manager = new ClientManager(this);
+ ClientManager = new ClientManager(this);
_topicRouteCache = new ConcurrentDictionary<string, TopicRouteData>();
@@ -91,7 +91,7 @@ namespace Org.Apache.Rocketmq
_topicRouteUpdateCtx.Cancel();
_heartbeatCts.Cancel();
_telemetryCts.Cancel();
- await Manager.Shutdown();
+ await ClientManager.Shutdown();
Logger.Debug($"Shutdown the rocketmq client successfully, clientId={ClientId}");
}
@@ -120,7 +120,7 @@ namespace Org.Apache.Rocketmq
return (false, session);
}
- var stream = Manager.Telemetry(endpoints);
+ var stream = ClientManager.Telemetry(endpoints);
var created = new Session(endpoints, stream, this);
_sessionsTable.Add(endpoints, created);
return (true, created);
@@ -134,7 +134,7 @@ namespace Org.Apache.Rocketmq
protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
- protected abstract void OnTopicDataFetched0(string topic, TopicRouteData topicRouteData);
+ protected abstract void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData);
private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
@@ -158,7 +158,7 @@ namespace Org.Apache.Rocketmq
}
_topicRouteCache[topic] = topicRouteData;
- OnTopicDataFetched0(topic, topicRouteData);
+ OnTopicRouteDataFetched0(topic, topicRouteData);
}
@@ -198,16 +198,26 @@ namespace Org.Apache.Rocketmq
}
}
- private static void ScheduleWithFixedDelay(Action action, TimeSpan period, CancellationToken token)
+ private void ScheduleWithFixedDelay(Action action, TimeSpan period, CancellationToken token)
{
Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
- action();
- await Task.Delay(period, token);
+ try
+ {
+ action();
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, $"Failed to execute scheduled task, ClientId={ClientId}");
+ }
+ finally
+ {
+ await Task.Delay(period, token);
+ }
}
- });
+ }, token);
}
protected async Task<TopicRouteData> FetchTopicRoute(string topic)
@@ -232,7 +242,7 @@ namespace Org.Apache.Rocketmq
};
var response =
- await Manager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout);
+ await ClientManager.QueryRoute(ClientConfig.Endpoints, request, ClientConfig.RequestTimeout);
var code = response.Status.Code;
if (!Proto.Code.Ok.Equals(code))
{
@@ -245,7 +255,7 @@ namespace Org.Apache.Rocketmq
return new TopicRouteData(messageQueues);
}
- public async void Heartbeat()
+ private async void Heartbeat()
{
var endpoints = GetTotalRouteEndpoints();
if (0 == endpoints.Count)
@@ -259,7 +269,7 @@ namespace Org.Apache.Rocketmq
// Collect task into a map.
foreach (var item in endpoints)
{
- var task = Manager.Heartbeat(item, request, ClientConfig.RequestTimeout);
+ var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
responses[item]= task;
}
foreach (var item in responses.Keys)
@@ -276,12 +286,14 @@ namespace Org.Apache.Rocketmq
Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, statusMessage={statusMessage}, clientId={ClientId}");
}
}
+
+
public grpc.Metadata Sign()
{
var metadata = new grpc::Metadata();
- Signature.Sign(ClientConfig, metadata);
+ Signature.Sign(this, metadata);
return metadata;
}
@@ -294,7 +306,7 @@ namespace Org.Apache.Rocketmq
};
foreach (var item in endpoints)
{
- var response = await Manager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout);
+ var response = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout);
try
{
StatusChecker.Check(response.Status, request);
@@ -319,6 +331,11 @@ namespace Org.Apache.Rocketmq
return ClientId;
}
+ public ClientConfig GetClientConfig()
+ {
+ return ClientConfig;
+ }
+
public void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
Proto.RecoverOrphanedTransactionCommand command)
{
diff --git a/csharp/rocketmq-client-csharp/ClientConfig.cs b/csharp/rocketmq-client-csharp/ClientConfig.cs
index e5fd8643..7e434eae 100644
--- a/csharp/rocketmq-client-csharp/ClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/ClientConfig.cs
@@ -22,13 +22,8 @@ namespace Org.Apache.Rocketmq
{
public class ClientConfig : IClientConfig
{
- private static long _instanceSequence = 0;
-
public ClientConfig(string endpoints)
{
- var hostName = System.Net.Dns.GetHostName();
- var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
- ClientId = $"{hostName}@{pid}@{Interlocked.Increment(ref _instanceSequence)}";
RequestTimeout = TimeSpan.FromSeconds(3);
Endpoints = new Endpoints(endpoints);
}
@@ -37,9 +32,6 @@ namespace Org.Apache.Rocketmq
public TimeSpan RequestTimeout { get; set; }
- public string ClientId { get; }
-
-
public Endpoints Endpoints { get; }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
index d9622291..890ce877 100644
--- a/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
+++ b/csharp/rocketmq-client-csharp/ClientLoggerInterceptor.cs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using System.Threading.Tasks;
using Grpc.Core;
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index bd18ebc4..3eef2fe6 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -21,7 +21,6 @@ using System.Threading;
using System.Threading.Tasks;
using grpc = Grpc.Core;
using System.Collections.Generic;
-using NLog;
namespace Org.Apache.Rocketmq
{
diff --git a/csharp/rocketmq-client-csharp/ClientType.cs b/csharp/rocketmq-client-csharp/ClientType.cs
index 15481c98..487b3bec 100644
--- a/csharp/rocketmq-client-csharp/ClientType.cs
+++ b/csharp/rocketmq-client-csharp/ClientType.cs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -28,19 +28,15 @@ namespace Org.Apache.Rocketmq
public static class ClientTypeHelper
{
- public static rmq.ClientType ToProtobuf(ClientType clientType)
+ public static Proto.ClientType ToProtobuf(ClientType clientType)
{
- switch (clientType)
+ return clientType switch
{
- case ClientType.Producer:
- return rmq.ClientType.Producer;
- case ClientType.SimpleConsumer:
- return rmq.ClientType.SimpleConsumer;
- case ClientType.PushConsumer:
- return rmq.ClientType.PushConsumer;
- default:
- return rmq.ClientType.Unspecified;
- }
+ ClientType.Producer => Proto.ClientType.Producer,
+ ClientType.SimpleConsumer => Proto.ClientType.SimpleConsumer,
+ ClientType.PushConsumer => Proto.ClientType.PushConsumer,
+ _ => Proto.ClientType.Unspecified
+ };
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs b/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
index 73d05f63..7764dc34 100644
--- a/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
+++ b/csharp/rocketmq-client-csharp/ConfigFileCredentialsProvider.cs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System.IO;
using System;
using System.Text.Json;
diff --git a/csharp/rocketmq-client-csharp/Endpoints.cs b/csharp/rocketmq-client-csharp/Endpoints.cs
index e7cf5f9c..54d8f0d2 100644
--- a/csharp/rocketmq-client-csharp/Endpoints.cs
+++ b/csharp/rocketmq-client-csharp/Endpoints.cs
@@ -1,19 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
using System.Collections.Generic;
using System.Linq;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class Endpoints : IEquatable<Endpoints>
{
private static readonly AddressListEqualityComparer AddressListComparer = new();
- private static readonly string EndpointSeparator = ":";
+ private const string EndpointSeparator = ":";
private List<Address> Addresses { get; }
private AddressScheme Scheme { get; }
private readonly int _hashCode;
- public Endpoints(global::Apache.Rocketmq.V2.Endpoints endpoints)
+ public Endpoints(Proto.Endpoints endpoints)
{
Addresses = new List<Address>();
foreach (var address in endpoints.Addresses)
@@ -28,13 +45,14 @@ namespace Org.Apache.Rocketmq
switch (endpoints.Scheme)
{
- case rmq.AddressScheme.Ipv4:
+ case Proto.AddressScheme.Ipv4:
Scheme = AddressScheme.Ipv4;
break;
- case rmq.AddressScheme.Ipv6:
+ case Proto.AddressScheme.Ipv6:
Scheme = AddressScheme.Ipv6;
break;
- case rmq.AddressScheme.DomainName:
+ case Proto.AddressScheme.DomainName:
+ case Proto.AddressScheme.Unspecified:
default:
Scheme = AddressScheme.DomainName;
if (Addresses.Count > 1)
@@ -123,9 +141,9 @@ namespace Org.Apache.Rocketmq
return _hashCode;
}
- public rmq.Endpoints ToProtobuf()
+ public Proto.Endpoints ToProtobuf()
{
- var endpoints = new rmq.Endpoints();
+ var endpoints = new Proto.Endpoints();
foreach (var address in Addresses)
{
endpoints.Addresses.Add(address.ToProtobuf());
diff --git a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
index e987d979..094c2607 100644
--- a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
+++ b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs
@@ -4,11 +4,11 @@ using Google.Protobuf.WellKnownTypes;
namespace Org.Apache.Rocketmq
{
- public class ExponentialBackoffRetryPolicy : RetryPolicy
+ public class ExponentialBackoffRetryPolicy : IRetryPolicy
{
- private int _maxAttempts;
+ private readonly int _maxAttempts;
- public ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff,
+ private ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff,
double backoffMultiplier)
{
_maxAttempts = maxAttempts;
@@ -17,7 +17,7 @@ namespace Org.Apache.Rocketmq
BackoffMultiplier = backoffMultiplier;
}
- public int getMaxAttempts()
+ public int GetMaxAttempts()
{
return _maxAttempts;
}
@@ -28,17 +28,17 @@ namespace Org.Apache.Rocketmq
public double BackoffMultiplier { get; }
- public TimeSpan getNextAttemptDelay(int attempt)
+ public TimeSpan GetNextAttemptDelay(int attempt)
{
return TimeSpan.Zero;
}
- public static ExponentialBackoffRetryPolicy immediatelyRetryPolicy(int maxAttempts)
+ public static ExponentialBackoffRetryPolicy ImmediatelyRetryPolicy(int maxAttempts)
{
return new ExponentialBackoffRetryPolicy(maxAttempts, TimeSpan.Zero, TimeSpan.Zero, 1);
}
- public global::Apache.Rocketmq.V2.RetryPolicy toProtobuf()
+ public global::Apache.Rocketmq.V2.RetryPolicy ToProtobuf()
{
var exponentialBackoff = new ExponentialBackoff
{
diff --git a/csharp/rocketmq-client-csharp/IClient.cs b/csharp/rocketmq-client-csharp/IClient.cs
index db219af9..5ba4c6f1 100644
--- a/csharp/rocketmq-client-csharp/IClient.cs
+++ b/csharp/rocketmq-client-csharp/IClient.cs
@@ -23,22 +23,44 @@ namespace Org.Apache.Rocketmq
{
public interface IClient
{
- void Heartbeat();
-
- void NotifyClientTermination(Proto.Resource group);
-
CancellationTokenSource TelemetryCts();
+ ClientConfig GetClientConfig();
+
Proto.Settings GetSettings();
+ /// <summary>
+ /// Get the identifier of current client.
+ /// </summary>
+ /// <returns>Client identifier.</returns>
string GetClientId();
+ /// <summary>
+ /// This method will be triggered when client settings is received from remote endpoints.
+ /// </summary>
+ /// <param name="endpoints"></param>
+ /// <param name="settings"></param>
void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings);
+ /// <summary>
+ /// This method will be triggered when orphaned transaction need to be recovered.
+ /// </summary>
+ /// <param name="endpoints">Remote endpoints.</param>
+ /// <param name="command">Command of orphaned transaction recovery.</param>
void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command);
+ /// <summary>
+ /// This method will be triggered when message verification command is received.
+ /// </summary>
+ /// <param name="endpoints">Remote endpoints.</param>
+ /// <param name="command">Command of message verification.</param>
void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command);
+ /// <summary>
+ /// This method will be triggered when thread stack trace command is received.
+ /// </summary>
+ /// <param name="endpoints">Remote endpoints.</param>
+ /// <param name="command">Command of printing thread stack trace.</param>
void OnPrintThreadStackTraceCommand(Endpoints endpoints, Proto.PrintThreadStackTraceCommand command);
Metadata Sign();
diff --git a/csharp/rocketmq-client-csharp/IClientConfig.cs b/csharp/rocketmq-client-csharp/IClientConfig.cs
index 5603a616..a50bdf93 100644
--- a/csharp/rocketmq-client-csharp/IClientConfig.cs
+++ b/csharp/rocketmq-client-csharp/IClientConfig.cs
@@ -15,21 +15,10 @@
* limitations under the License.
*/
-using System;
-
namespace Org.Apache.Rocketmq
{
public interface IClientConfig
{
-
- ICredentialsProvider CredentialsProvider
- {
- get;
- }
-
- string ClientId
- {
- get;
- }
+ ICredentialsProvider CredentialsProvider { get; }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index beb8880b..f2e48e36 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -19,15 +19,13 @@ using System.Threading.Tasks;
using System;
using System.Collections.Generic;
using Apache.Rocketmq.V2;
-using grpc = Grpc.Core;
-using rmq = Apache.Rocketmq.V2;
-
+using Grpc.Core;
namespace Org.Apache.Rocketmq
{
public interface IClientManager
{
- grpc::AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Endpoints endpoints);
+ AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Endpoints endpoints);
Task<QueryRouteResponse> QueryRoute(Endpoints endpoints, QueryRouteRequest request, TimeSpan timeout);
diff --git a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs b/csharp/rocketmq-client-csharp/ICredentialsProvider.cs
index 2f6e71eb..e98df14a 100644
--- a/csharp/rocketmq-client-csharp/ICredentialsProvider.cs
+++ b/csharp/rocketmq-client-csharp/ICredentialsProvider.cs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
namespace Org.Apache.Rocketmq
{
public interface ICredentialsProvider
diff --git a/csharp/rocketmq-client-csharp/Message.cs b/csharp/rocketmq-client-csharp/Message.cs
index fb004da3..9993b52f 100644
--- a/csharp/rocketmq-client-csharp/Message.cs
+++ b/csharp/rocketmq-client-csharp/Message.cs
@@ -20,14 +20,15 @@ using System.Collections.Generic;
namespace Org.Apache.Rocketmq
{
-
public class Message
{
public Message() : this(null, null)
{
}
- public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body) { }
+ public Message(string topic, byte[] body) : this(topic, null, new List<string>(), body)
+ {
+ }
public Message(string topic, string tag, byte[] body) : this(topic, tag, new List<string>(), body)
{
@@ -35,7 +36,6 @@ namespace Org.Apache.Rocketmq
public Message(string topic, string tag, List<string> keys, byte[] body)
{
- MaxAttemptTimes = 3;
Topic = topic;
Tag = tag;
Keys = keys;
@@ -44,73 +44,20 @@ namespace Org.Apache.Rocketmq
DeliveryTimestamp = null;
}
- public string Topic
- {
- get;
- set;
- }
+ public string Topic { get; set; }
- public byte[] Body
- {
- get;
- set;
- }
+ public byte[] Body { get; set; }
- public string Tag
- {
- get;
- set;
- }
+ public string Tag { get; set; }
- public List<string> Keys
- {
- get;
- set;
- }
- public Dictionary<string, string> UserProperties
- {
- get;
- set;
- }
+ public List<string> Keys { get; set; }
+ public Dictionary<string, string> UserProperties { get; set; }
- public int MaxAttemptTimes
- {
- get;
- set;
- }
+ public DateTime? DeliveryTimestamp { get; set; }
- public DateTime? DeliveryTimestamp
- {
- get;
- set;
- }
-
- public int DeliveryAttempt
- {
- get;
- internal set;
- }
-
- public string MessageGroup
- {
- get;
- set;
- }
-
- public bool Fifo()
- {
- return !String.IsNullOrEmpty(MessageGroup);
- }
-
- public bool Scheduled()
- {
- return DeliveryTimestamp > DateTime.UtcNow;
- }
+ public int DeliveryAttempt { get; internal set; }
- internal bool _checksumVerifiedOk = true;
- internal string _receiptHandle;
- internal string _sourceHost;
+ public string MessageGroup { get; set; }
}
-
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageException.cs b/csharp/rocketmq-client-csharp/MessageException.cs
deleted file mode 100644
index 7ef10df3..00000000
--- a/csharp/rocketmq-client-csharp/MessageException.cs
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-namespace Org.Apache.Rocketmq
-{
- [Serializable]
- public class MessageException : Exception
- {
- public MessageException(string message) : base(message)
- {
- }
- }
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageIdGenerator.cs b/csharp/rocketmq-client-csharp/MessageIdGenerator.cs
index 8dc370d1..60620ef0 100644
--- a/csharp/rocketmq-client-csharp/MessageIdGenerator.cs
+++ b/csharp/rocketmq-client-csharp/MessageIdGenerator.cs
@@ -27,7 +27,7 @@ namespace Org.Apache.Rocketmq
*/
public class MessageIdGenerator
{
- public static readonly string version = "01";
+ public const string Version = "01";
private static readonly MessageIdGenerator Instance = new();
private readonly string _prefix;
@@ -39,15 +39,15 @@ namespace Org.Apache.Rocketmq
private MessageIdGenerator()
{
- MemoryStream stream = new MemoryStream();
- BinaryWriter writer = new BinaryWriter(stream);
+ var stream = new MemoryStream();
+ var writer = new BinaryWriter(stream);
var macAddress = Utilities.GetMacAddress();
writer.Write(macAddress, 0, 6);
- int processId = Utilities.GetProcessId();
+ var processId = Utilities.GetProcessId();
- byte[] processIdBytes = BitConverter.GetBytes(processId);
+ var processIdBytes = BitConverter.GetBytes(processId);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(processIdBytes);
@@ -55,9 +55,9 @@ namespace Org.Apache.Rocketmq
writer.Write(processIdBytes, 2, 2);
var array = stream.ToArray();
- _prefix = version + Utilities.ByteArrayToHexString(array);
+ _prefix = Version + Utilities.ByteArrayToHexString(array);
- DateTime epoch = new DateTime(2021, 1, 1,
+ var epoch = new DateTime(2021, 1, 1,
0, 0, 0, 0, DateTimeKind.Utc);
var now = DateTime.Now;
@@ -67,12 +67,12 @@ namespace Org.Apache.Rocketmq
_sequence = 0;
}
- public String Next()
+ public string Next()
{
- long deltaSeconds = _secondsSinceCustomEpoch + _stopwatch.ElapsedMilliseconds / 1_000;
+ var deltaSeconds = _secondsSinceCustomEpoch + _stopwatch.ElapsedMilliseconds / 1_000;
- MemoryStream stream = new MemoryStream();
- BinaryWriter writer = new BinaryWriter(stream);
+ var stream = new MemoryStream();
+ var writer = new BinaryWriter(stream);
byte[] deltaSecondsBytes = BitConverter.GetBytes(deltaSeconds);
if (BitConverter.IsLittleEndian)
@@ -82,8 +82,8 @@ namespace Org.Apache.Rocketmq
writer.Write(deltaSecondsBytes, 4, 4);
- int no = Interlocked.Increment(ref _sequence);
- byte[] noBytes = BitConverter.GetBytes(no);
+ var no = Interlocked.Increment(ref _sequence);
+ var noBytes = BitConverter.GetBytes(no);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(noBytes);
diff --git a/csharp/rocketmq-client-csharp/MessageQueue.cs b/csharp/rocketmq-client-csharp/MessageQueue.cs
index 385e392c..cd6f0ce3 100644
--- a/csharp/rocketmq-client-csharp/MessageQueue.cs
+++ b/csharp/rocketmq-client-csharp/MessageQueue.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System.Collections.Generic;
using rmq = Apache.Rocketmq.V2;
diff --git a/csharp/rocketmq-client-csharp/MessageType.cs b/csharp/rocketmq-client-csharp/MessageType.cs
index 6338e365..09896a0b 100644
--- a/csharp/rocketmq-client-csharp/MessageType.cs
+++ b/csharp/rocketmq-client-csharp/MessageType.cs
@@ -16,7 +16,7 @@
*/
using Org.Apache.Rocketmq.Error;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -30,38 +30,34 @@ namespace Org.Apache.Rocketmq
public static class MessageTypeHelper
{
- public static MessageType FromProtobuf(rmq.MessageType messageType)
+ public static MessageType FromProtobuf(Proto.MessageType messageType)
{
switch (messageType)
{
- case rmq.MessageType.Normal:
+ case Proto.MessageType.Normal:
return MessageType.Normal;
- case rmq.MessageType.Fifo:
+ case Proto.MessageType.Fifo:
return MessageType.Fifo;
- case rmq.MessageType.Delay:
+ case Proto.MessageType.Delay:
return MessageType.Delay;
- case rmq.MessageType.Transaction:
+ case Proto.MessageType.Transaction:
return MessageType.Transaction;
+ case Proto.MessageType.Unspecified:
default:
throw new InternalErrorException("MessageType is not specified");
}
}
- public static rmq.MessageType ToProtobuf(MessageType messageType)
+ public static Proto.MessageType ToProtobuf(MessageType messageType)
{
- switch (messageType)
+ return messageType switch
{
- case MessageType.Normal:
- return rmq.MessageType.Normal;
- case MessageType.Fifo:
- return rmq.MessageType.Fifo;
- case MessageType.Delay:
- return rmq.MessageType.Delay;
- case MessageType.Transaction:
- return rmq.MessageType.Transaction;
- default:
- return rmq.MessageType.Unspecified;
- }
+ MessageType.Normal => Proto.MessageType.Normal,
+ MessageType.Fifo => Proto.MessageType.Fifo,
+ MessageType.Delay => Proto.MessageType.Delay,
+ MessageType.Transaction => Proto.MessageType.Transaction,
+ _ => Proto.MessageType.Unspecified
+ };
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index 57b573ac..26d7fcc6 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -79,7 +79,7 @@ namespace Org.Apache.Rocketmq
public int DeliveryAttempt { get; }
- public static MessageView fromProtobuf(rmq.Message message, rmq.MessageQueue messageQueue)
+ public static MessageView FromProtobuf(rmq.Message message, rmq.MessageQueue messageQueue)
{
var topic = message.Topic.Name;
var systemProperties = message.SystemProperties;
diff --git a/csharp/rocketmq-client-csharp/MqEncoding.cs b/csharp/rocketmq-client-csharp/MqEncoding.cs
index ba2a489d..27dfb052 100644
--- a/csharp/rocketmq-client-csharp/MqEncoding.cs
+++ b/csharp/rocketmq-client-csharp/MqEncoding.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
@@ -12,15 +29,12 @@ namespace Org.Apache.Rocketmq
{
public static rmq.Encoding ToProtobuf(MqEncoding mqEncoding)
{
- switch (mqEncoding)
+ return mqEncoding switch
{
- case MqEncoding.Gzip:
- return rmq.Encoding.Gzip;
- case MqEncoding.Identity:
- return rmq.Encoding.Identity;
- default:
- return rmq.Encoding.Unspecified;
- }
+ MqEncoding.Gzip => rmq.Encoding.Gzip,
+ MqEncoding.Identity => rmq.Encoding.Identity,
+ _ => rmq.Encoding.Unspecified
+ };
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MqLogManager.cs b/csharp/rocketmq-client-csharp/MqLogManager.cs
index dcbdce57..7fa2b7bf 100644
--- a/csharp/rocketmq-client-csharp/MqLogManager.cs
+++ b/csharp/rocketmq-client-csharp/MqLogManager.cs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using System.IO;
using System.Reflection;
diff --git a/csharp/rocketmq-client-csharp/Permission.cs b/csharp/rocketmq-client-csharp/Permission.cs
index d5fe6348..eedd6247 100644
--- a/csharp/rocketmq-client-csharp/Permission.cs
+++ b/csharp/rocketmq-client-csharp/Permission.cs
@@ -48,32 +48,30 @@ namespace Org.Apache.Rocketmq
}
public static bool IsWritable(Permission permission) {
- if (Permission.Write.Equals(permission))
- {
- return true;
- }
-
- if (Permission.ReadWrite.Equals(permission))
+ switch (permission)
{
- return true;
+ case Permission.Write:
+ case Permission.ReadWrite:
+ return true;
+ case Permission.None:
+ case Permission.Read:
+ default:
+ return false;
}
-
- return false;
}
public static bool IsReadable(Permission permission)
{
- if (Permission.Read.Equals(permission))
- {
- return true;
- }
-
- if (Permission.ReadWrite.Equals(permission))
+ switch (permission)
{
- return true;
+ case Permission.Read:
+ case Permission.ReadWrite:
+ return true;
+ case Permission.None:
+ case Permission.Write:
+ default:
+ return false;
}
-
- return false;
}
}
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 56ec5b40..c2782fc0 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -27,7 +44,7 @@ namespace Org.Apache.Rocketmq
private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> topics, int maxAttempts) :
base(clientConfig, topics)
{
- var retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
+ var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
_publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
clientConfig.RequestTimeout, topics);
_publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
@@ -63,27 +80,37 @@ namespace Org.Apache.Rocketmq
};
}
- protected override void OnTopicDataFetched0(string topic, TopicRouteData topicRouteData)
+ private async Task<PublishingLoadBalancer> GetPublishingLoadBalancer(string topic)
{
+ if (_publishingRouteDataCache.TryGetValue(topic, out var publishingLoadBalancer))
+ {
+ return publishingLoadBalancer;
+ }
+
+ var topicRouteData = await FetchTopicRoute(topic);
+ publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
+ _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer);
+
+ return publishingLoadBalancer;
}
- private RetryPolicy GetRetryPolicy()
+ protected override void OnTopicRouteDataFetched0(string topic, TopicRouteData topicRouteData)
+ {
+ var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
+ _publishingRouteDataCache.TryAdd(topic, publishingLoadBalancer);
+ }
+
+ private IRetryPolicy GetRetryPolicy()
{
return _publishingSettings.GetRetryPolicy();
}
public async Task<SendReceipt> Send(Message message)
{
- if (!_publishingRouteDataCache.TryGetValue(message.Topic, out var publishingLoadBalancer))
- {
- var topicRouteData = await FetchTopicRoute(message.Topic);
- publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
- _publishingRouteDataCache.TryAdd(message.Topic, publishingLoadBalancer);
- }
-
+ var publishingLoadBalancer = await GetPublishingLoadBalancer(message.Topic);
var publishingMessage = new PublishingMessage(message, _publishingSettings, false);
var retryPolicy = GetRetryPolicy();
- var maxAttempts = retryPolicy.getMaxAttempts();
+ var maxAttempts = retryPolicy.GetMaxAttempts();
var candidates = publishingLoadBalancer.TakeMessageQueues(publishingMessage.MessageGroup, maxAttempts);
Exception exception = null;
for (var attempt = 0; attempt < maxAttempts; attempt++)
@@ -102,7 +129,7 @@ namespace Org.Apache.Rocketmq
throw exception!;
}
- private Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
+ private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
{
return new Proto.SendMessageRequest
{
@@ -125,11 +152,11 @@ namespace Org.Apache.Rocketmq
var sendMessageRequest = WrapSendMessageRequest(message, mq);
var endpoints = mq.Broker.Endpoints;
- Proto.SendMessageResponse response =
- await Manager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
+ var response =
+ await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
try
{
- var sendReceipts = SendReceipt.processSendMessageResponse(response);
+ var sendReceipts = SendReceipt.ProcessSendMessageResponse(response);
var sendReceipt = sendReceipts.First();
if (attempt > 1)
diff --git a/csharp/rocketmq-client-csharp/PublishingMessage.cs b/csharp/rocketmq-client-csharp/PublishingMessage.cs
index 93eb2de6..7839edaa 100644
--- a/csharp/rocketmq-client-csharp/PublishingMessage.cs
+++ b/csharp/rocketmq-client-csharp/PublishingMessage.cs
@@ -19,7 +19,7 @@ using System;
using System.IO;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
using Org.Apache.Rocketmq.Error;
namespace Org.Apache.Rocketmq
@@ -31,7 +31,7 @@ namespace Org.Apache.Rocketmq
{
public MessageType MessageType { set; get; }
- public String MessageId { get; }
+ private string MessageId { get; }
public PublishingMessage(Message message, PublishingSettings publishingSettings, bool txEnabled) : base(
message.Topic, message.Body)
@@ -45,7 +45,7 @@ namespace Org.Apache.Rocketmq
// Generate message id.
MessageId = MessageIdGenerator.GetInstance().Next();
// For NORMAL message.
- if (String.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue &&
+ if (string.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue &&
!txEnabled)
{
MessageType = MessageType.Normal;
@@ -53,7 +53,7 @@ namespace Org.Apache.Rocketmq
}
// For FIFO message.
- if (!String.IsNullOrEmpty(message.MessageGroup) && !txEnabled)
+ if (!string.IsNullOrEmpty(message.MessageGroup) && !txEnabled)
{
MessageType = MessageType.Fifo;
return;
@@ -67,18 +67,15 @@ namespace Org.Apache.Rocketmq
}
// For TRANSACTION message.
- if (!String.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue && txEnabled)
- {
- MessageType = MessageType.Transaction;
- return;
- }
-
- throw new InternalErrorException("Transactional message should not set messageGroup or deliveryTimestamp");
+ if (string.IsNullOrEmpty(message.MessageGroup) || message.DeliveryTimestamp.HasValue || !txEnabled)
+ throw new InternalErrorException(
+ "Transactional message should not set messageGroup or deliveryTimestamp");
+ MessageType = MessageType.Transaction;
}
- public rmq::Message ToProtobuf(int queueId)
+ public Proto::Message ToProtobuf(int queueId)
{
- rmq.SystemProperties systemProperties = new rmq.SystemProperties
+ var systemProperties = new Proto.SystemProperties
{
Keys = { Keys },
MessageId = MessageId,
@@ -103,11 +100,11 @@ namespace Org.Apache.Rocketmq
systemProperties.MessageGroup = MessageGroup;
}
- rmq.Resource topicResource = new rmq.Resource
+ var topicResource = new Proto.Resource
{
Name = Topic
};
- return new rmq.Message
+ return new Proto.Message
{
Topic = topicResource,
Body = ByteString.CopyFrom(Body),
diff --git a/csharp/rocketmq-client-csharp/PublishingSettings.cs b/csharp/rocketmq-client-csharp/PublishingSettings.cs
index b543cb71..023b0be3 100644
--- a/csharp/rocketmq-client-csharp/PublishingSettings.cs
+++ b/csharp/rocketmq-client-csharp/PublishingSettings.cs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -55,7 +72,7 @@ namespace Org.Apache.Rocketmq
AccessPoint = AccessPoint.ToProtobuf(),
ClientType = ClientTypeHelper.ToProtobuf(ClientType),
RequestTimeout = Duration.FromTimeSpan(RequestTimeout),
- BackoffPolicy = RetryPolicy.toProtobuf(),
+ BackoffPolicy = RetryPolicy.ToProtobuf(),
UserAgent = UserAgent.Instance.ToProtobuf()
};
}
diff --git a/csharp/rocketmq-client-csharp/Resource.cs b/csharp/rocketmq-client-csharp/Resource.cs
index 5af67d1e..a1825f15 100644
--- a/csharp/rocketmq-client-csharp/Resource.cs
+++ b/csharp/rocketmq-client-csharp/Resource.cs
@@ -1,11 +1,11 @@
using System;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
public class Resource
{
- public Resource(rmq.Resource resource)
+ public Resource(Proto.Resource resource)
{
Namespace = resource.ResourceNamespace;
Name = resource.Name;
@@ -14,9 +14,9 @@ namespace Org.Apache.Rocketmq
public string Namespace { get; }
public string Name { get; }
- public rmq.Resource ToProtobuf()
+ public Proto.Resource ToProtobuf()
{
- return new rmq.Resource
+ return new Proto.Resource
{
ResourceNamespace = Namespace,
Name = Name
diff --git a/csharp/rocketmq-client-csharp/RetryPolicy.cs b/csharp/rocketmq-client-csharp/RetryPolicy.cs
index 9169b5d1..92b82013 100644
--- a/csharp/rocketmq-client-csharp/RetryPolicy.cs
+++ b/csharp/rocketmq-client-csharp/RetryPolicy.cs
@@ -2,12 +2,12 @@ using System;
namespace Org.Apache.Rocketmq
{
- public interface RetryPolicy
+ public interface IRetryPolicy
{
- int getMaxAttempts();
+ int GetMaxAttempts();
- TimeSpan getNextAttemptDelay(int attempt);
+ TimeSpan GetNextAttemptDelay(int attempt);
- global::Apache.Rocketmq.V2.RetryPolicy toProtobuf();
+ global::Apache.Rocketmq.V2.RetryPolicy ToProtobuf();
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs
index a4ff1e3e..fa5c75c7 100644
--- a/csharp/rocketmq-client-csharp/SendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -16,7 +16,8 @@
*/
using System.Collections.Generic;
-using rmq = Apache.Rocketmq.V2;
+using System.Linq;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -34,12 +35,12 @@ namespace Org.Apache.Rocketmq
return $"{nameof(MessageId)}: {MessageId}";
}
- public static List<SendReceipt> processSendMessageResponse(rmq.SendMessageResponse response)
+ public static List<SendReceipt> ProcessSendMessageResponse(Proto.SendMessageResponse response)
{
- rmq.Status status = response.Status;
+ var status = response.Status;
foreach (var entry in response.Entries)
{
- if (rmq.Code.Ok.Equals(entry.Status.Code))
+ if (Proto.Code.Ok.Equals(entry.Status.Code))
{
status = entry.Status;
}
@@ -47,13 +48,7 @@ namespace Org.Apache.Rocketmq
// May throw exception.
StatusChecker.Check(status, response);
- List<SendReceipt> sendReceipts = new List<SendReceipt>();
- foreach (var entry in response.Entries)
- {
- sendReceipts.Add(new SendReceipt(entry.MessageId));
- }
-
- return sendReceipts;
+ return response.Entries.Select(entry => new SendReceipt(entry.MessageId)).ToList();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SequenceGenerator.cs b/csharp/rocketmq-client-csharp/SequenceGenerator.cs
deleted file mode 100644
index 97a1eb91..00000000
--- a/csharp/rocketmq-client-csharp/SequenceGenerator.cs
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Threading;
-using System.Net.NetworkInformation;
-using NLog;
-
-namespace Org.Apache.Rocketmq
-{
- /**
- * See https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o for Sequence ID spec.
- *
- * In the implementation layer, this class follows Singleton pattern.
- */
- public sealed class SequenceGenerator
- {
- private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
-
- public static SequenceGenerator Instance
- {
- get
- {
- return Nested.instance;
- }
- }
-
- private class Nested
- {
- static Nested()
- {
-
- }
-
- internal static readonly SequenceGenerator instance = new SequenceGenerator();
- }
-
- private SequenceGenerator()
- {
- currentSecond = SecondsSinceCustomEpoch();
- macAddress = MacAddress();
- pidBytes = ToArray(pid);
- if (BitConverter.IsLittleEndian)
- {
- Array.Reverse(version);
- }
- }
-
- /**
- * Sequence version, 2 bytes.
- */
- private static byte[] version = new byte[2] { 0x00, 0x01 };
-
- /**
- * MAC address, 6 bytes.
- */
- private byte[] macAddress;
-
- private int sequenceInSecond = 0;
- private int currentSecond;
-
- private static int pid = System.Diagnostics.Process.GetCurrentProcess().Id;
- private static byte[] pidBytes;
-
- private static byte[] ToArray(int number)
- {
- byte[] bytes = BitConverter.GetBytes(number);
- if (BitConverter.IsLittleEndian)
- Array.Reverse(bytes);
- return bytes;
- }
-
- private static int SecondsSinceCustomEpoch()
- {
- var customEpoch = new DateTime(2021, 01, 01, 00, 00, 00, DateTimeKind.Utc);
- var diff = DateTime.UtcNow.Subtract(customEpoch);
- return (int)diff.TotalSeconds;
- }
-
- private static byte[] MacAddress()
- {
- foreach (var nic in NetworkInterface.GetAllNetworkInterfaces())
- {
- if (nic.OperationalStatus == OperationalStatus.Up)
- {
- if (nic.Name.StartsWith("lo"))
- {
- continue;
- }
- Logger.Debug($"NIC={nic.Name}");
- return nic.GetPhysicalAddress().GetAddressBytes();
- }
- }
- return null;
- }
-
- public string Next()
- {
- byte[] data = new byte[18];
- Array.Copy(version, 0, data, 0, 2);
- Array.Copy(macAddress, 0, data, 2, 6);
- Array.Copy(pidBytes, 2, data, 8, 2);
- int second = SecondsSinceCustomEpoch();
- if (second != currentSecond)
- {
- currentSecond = second;
- Interlocked.Exchange(ref sequenceInSecond, 0);
- }
- byte[] secondBytes = ToArray(second);
- Array.Copy(secondBytes, 0, data, 10, 4);
- int sequence = Interlocked.Increment(ref sequenceInSecond);
- byte[] sequenceBytes = ToArray(sequence);
- Array.Copy(sequenceBytes, 0, data, 14, 4);
- return BitConverter.ToString(data).Replace("-", ""); ;
- }
- }
-
-}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Session.cs b/csharp/rocketmq-client-csharp/Session.cs
index 82f4f1ef..99d61268 100644
--- a/csharp/rocketmq-client-csharp/Session.cs
+++ b/csharp/rocketmq-client-csharp/Session.cs
@@ -34,14 +34,14 @@ namespace Org.Apache.Rocketmq
private readonly grpc::AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand>
_streamingCall;
- private readonly Client _client;
+ private readonly IClient _client;
private readonly Channel<bool> _channel;
private readonly Endpoints _endpoints;
private readonly SemaphoreSlim _semaphore;
public Session(Endpoints endpoints,
AsyncDuplexStreamingCall<Proto::TelemetryCommand, Proto::TelemetryCommand> streamingCall,
- Client client)
+ IClient client)
{
_endpoints = endpoints;
_semaphore = new SemaphoreSlim(1);
@@ -80,21 +80,6 @@ namespace Org.Apache.Rocketmq
}
}
- // public async void xx()
- // {
- // while (true)
- // {
- // var reader = _streamingCall.ResponseStream;
- // if (await reader.MoveNext(_client.TelemetryCts().Token))
- // {
- // var command = reader.Current;
- // Console.WriteLine("xxxxxxxx");
- // Console.WriteLine(command);
- // }
- // }
- // }
-
-
private void Loop()
{
Task.Run(async () =>
diff --git a/csharp/rocketmq-client-csharp/Settings.cs b/csharp/rocketmq-client-csharp/Settings.cs
index e7ea4e92..7716fc2d 100644
--- a/csharp/rocketmq-client-csharp/Settings.cs
+++ b/csharp/rocketmq-client-csharp/Settings.cs
@@ -1,5 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
using System;
-using rmq = Apache.Rocketmq.V2;
+using Proto = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq
{
@@ -8,10 +25,10 @@ namespace Org.Apache.Rocketmq
protected readonly string ClientId;
protected readonly ClientType ClientType;
protected readonly Endpoints AccessPoint;
- protected volatile RetryPolicy RetryPolicy;
+ protected volatile IRetryPolicy RetryPolicy;
protected readonly TimeSpan RequestTimeout;
- public Settings(string clientId, ClientType clientType, Endpoints accessPoint, RetryPolicy retryPolicy,
+ public Settings(string clientId, ClientType clientType, Endpoints accessPoint, IRetryPolicy retryPolicy,
TimeSpan requestTimeout)
{
ClientId = clientId;
@@ -30,11 +47,11 @@ namespace Org.Apache.Rocketmq
RequestTimeout = requestTimeout;
}
- public abstract rmq::Settings ToProtobuf();
+ public abstract Proto::Settings ToProtobuf();
- public abstract void Sync(rmq::Settings settings);
+ public abstract void Sync(Proto::Settings settings);
- public RetryPolicy GetRetryPolicy()
+ public IRetryPolicy GetRetryPolicy()
{
return RetryPolicy;
}
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index e65125b6..8588c25a 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using System.Text;
using grpc = Grpc.Core;
@@ -23,43 +24,37 @@ namespace Org.Apache.Rocketmq
{
public static class Signature
{
- public static void Sign(IClientConfig clientConfig, grpc::Metadata metadata)
+ public static void Sign(IClient client, grpc::Metadata metadata)
{
+ var clientConfig = client.GetClientConfig();
metadata.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue);
metadata.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
- metadata.Add(MetadataConstants.ClientIdKey, clientConfig.ClientId);
-
- string time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
+ metadata.Add(MetadataConstants.ClientIdKey, client.GetClientId());
+
+ var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
metadata.Add(MetadataConstants.DateTimeKey, time);
- if (null != clientConfig.CredentialsProvider)
+ var credentials = clientConfig.CredentialsProvider?.Credentials;
+ if (credentials == null || credentials.expired())
{
- var credentials = clientConfig.CredentialsProvider.Credentials;
- if (null == credentials || credentials.expired())
- {
- return;
- }
-
- if (!String.IsNullOrEmpty(credentials.SessionToken))
- {
- metadata.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken);
- }
+ return;
+ }
- byte[] secretData = Encoding.ASCII.GetBytes(credentials.AccessSecret);
- byte[] data = Encoding.ASCII.GetBytes(time);
- HMACSHA1 signer = new HMACSHA1(secretData);
- byte[] digest = signer.ComputeHash(data);
- string hmac = BitConverter.ToString(digest).Replace("-", "");
- string authorization = string.Format("{0} {1}={2}, {3}={4}, {5}={6}",
- MetadataConstants.AlgorithmKey,
- MetadataConstants.CredentialKey,
- credentials.AccessKey,
- MetadataConstants.SignedHeadersKey,
- MetadataConstants.DateTimeKey,
- MetadataConstants.SignatureKey,
- hmac);
- metadata.Add(MetadataConstants.Authorization, authorization);
+ if (!string.IsNullOrEmpty(credentials.SessionToken))
+ {
+ metadata.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken);
}
+
+ var secretData = Encoding.ASCII.GetBytes(credentials.AccessSecret);
+ var data = Encoding.ASCII.GetBytes(time);
+ var signer = new HMACSHA1(secretData);
+ var digest = signer.ComputeHash(data);
+ var hmac = BitConverter.ToString(digest).Replace("-", "");
+ var authorization = $"{MetadataConstants.AlgorithmKey} " +
+ $"{MetadataConstants.CredentialKey}={credentials.AccessKey}, " +
+ $"{MetadataConstants.SignedHeadersKey}={MetadataConstants.DateTimeKey}, " +
+ $"{MetadataConstants.SignatureKey}={hmac}";
+ metadata.Add(MetadataConstants.Authorization, authorization);
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/StatusChecker.cs b/csharp/rocketmq-client-csharp/StatusChecker.cs
index cf15c204..641fd097 100644
--- a/csharp/rocketmq-client-csharp/StatusChecker.cs
+++ b/csharp/rocketmq-client-csharp/StatusChecker.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.Rocketmq
public static void Check(Proto.Status status, IMessage message)
{
- Proto.Code statusCode = status.Code;
+ var statusCode = status.Code;
var statusMessage = status.Message;
switch (statusCode)
diff --git a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
index cf803377..b77da833 100644
--- a/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
+++ b/csharp/rocketmq-client-csharp/SubscriptionLoadBalancer.cs
@@ -15,7 +15,6 @@
* limitations under the License.
*/
-using System;
using System.Collections.Generic;
using System.Threading;
using rmq = Apache.Rocketmq.V2;
diff --git a/csharp/rocketmq-client-csharp/TopicRouteException.cs b/csharp/rocketmq-client-csharp/TopicRouteException.cs
index 75462fd3..c80e8699 100644
--- a/csharp/rocketmq-client-csharp/TopicRouteException.cs
+++ b/csharp/rocketmq-client-csharp/TopicRouteException.cs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
namespace Org.Apache.Rocketmq
{
diff --git a/csharp/tests/MessageIdGeneratorTest.cs b/csharp/tests/MessageIdGeneratorTest.cs
index c98e1131..19d0cebd 100644
--- a/csharp/tests/MessageIdGeneratorTest.cs
+++ b/csharp/tests/MessageIdGeneratorTest.cs
@@ -29,11 +29,11 @@ namespace tests
MessageIdGenerator instance = MessageIdGenerator.GetInstance();
var firstMessageId = instance.Next();
Assert.AreEqual(34, firstMessageId.Length);
- Assert.AreEqual(MessageIdGenerator.version, firstMessageId.Substring(0, 2));
+ Assert.AreEqual(MessageIdGenerator.Version, firstMessageId.Substring(0, 2));
var secondMessageId = instance.Next();
Assert.AreEqual(34, secondMessageId.Length);
- Assert.AreEqual(MessageIdGenerator.version, secondMessageId.Substring(0, 2));
+ Assert.AreEqual(MessageIdGenerator.Version, secondMessageId.Substring(0, 2));
Assert.AreNotEqual(firstMessageId, secondMessageId);
Assert.AreEqual(firstMessageId.Substring(0, 24), secondMessageId.Substring(0, 24));
diff --git a/csharp/tests/SequenceGeneratorTest.cs b/csharp/tests/SequenceGeneratorTest.cs
deleted file mode 100644
index 9b553346..00000000
--- a/csharp/tests/SequenceGeneratorTest.cs
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-
-using System;
-using System.Collections.Generic;
-
-namespace Org.Apache.Rocketmq
-{
- [TestClass]
- public class SequenceGeneratorTest
- {
-
- [ClassInitialize]
- public static void SetUp(TestContext context)
- {
- }
-
- [TestMethod]
- public void testNext()
- {
- var set = new HashSet<string>();
- for (int i = 0; i < 500000; i++)
- {
- var nextId = SequenceGenerator.Instance.Next();
- if (set.Contains(nextId))
- {
- Assert.Fail("SequenceGenerator violates uniqueness");
- }
- set.Add(nextId);
- }
- }
- }
-}
-