You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/27 06:23:53 UTC
[rocketmq-client-csharp] branch observability updated: Adjust message
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/observability by this push:
new 32f21f6 Adjust message
32f21f6 is described below
commit 32f21f65c583cf8723670a71b2834dcf7dddf7c2
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jun 27 14:23:44 2022 +0800
Adjust message
---
rocketmq-client-csharp/ClientManager.cs | 10 +--
rocketmq-client-csharp/Message.cs | 91 +++++++++++++--------------
rocketmq-client-csharp/Producer.cs | 26 +++++---
rocketmq-client-csharp/PublishLoadBalancer.cs | 14 ++---
tests/ProducerTest.cs | 15 +++--
5 files changed, 84 insertions(+), 72 deletions(-)
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 1168799..a79cbe3 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -219,7 +219,7 @@ namespace Org.Apache.Rocketmq
{
var msg = new Message();
msg.Topic = message.Topic.Name;
- msg.messageId = message.SystemProperties.MessageId;
+ msg.MessageId = message.SystemProperties.MessageId;
msg.Tag = message.SystemProperties.Tag;
// Validate message body checksum
@@ -229,7 +229,7 @@ namespace Org.Apache.Rocketmq
uint checksum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length);
if (!message.SystemProperties.BodyDigest.Checksum.Equals(checksum.ToString("X")))
{
- msg._bodyChecksumVerified = false;
+ msg._checksumVerifiedOk = false;
}
}
else if (rmq::DigestType.Md5 == message.SystemProperties.BodyDigest.Type)
@@ -237,7 +237,7 @@ namespace Org.Apache.Rocketmq
var checksum = MD5.HashData(raw);
if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
{
- msg._bodyChecksumVerified = false;
+ msg._checksumVerifiedOk = false;
}
}
else if (rmq::DigestType.Sha1 == message.SystemProperties.BodyDigest.Type)
@@ -245,7 +245,7 @@ namespace Org.Apache.Rocketmq
var checksum = SHA1.HashData(raw);
if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
{
- msg._bodyChecksumVerified = false;
+ msg._checksumVerifiedOk = false;
}
}
@@ -262,7 +262,7 @@ namespace Org.Apache.Rocketmq
msg.Keys.Add(key);
}
- msg._deliveryAttempt = message.SystemProperties.DeliveryAttempt;
+ msg.DeliveryAttempt = message.SystemProperties.DeliveryAttempt;
if (message.SystemProperties.BodyEncoding == rmq::Encoding.Gzip)
{
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index b8b0e98..2671a9d 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using System.Collections.Generic;
namespace Org.Apache.Rocketmq
{
@@ -33,82 +34,80 @@ namespace Org.Apache.Rocketmq
public Message(string topic, string tag, List<string> keys, byte[] body)
{
- this.messageId = SequenceGenerator.Instance.Next();
- this.maxAttemptTimes = 3;
- this.topic = topic;
- this.tag = tag;
- this.keys = keys;
- this.body = body;
- this.userProperties = new Dictionary<string, string>();
- this.systemProperties = new Dictionary<string, string>();
+ MessageId = SequenceGenerator.Instance.Next();
+ MaxAttemptTimes = 3;
+ Topic = topic;
+ Tag = tag;
+ Keys = keys;
+ Body = body;
+ UserProperties = new Dictionary<string, string>();
}
- internal string messageId;
public string MessageId
{
- get { return messageId; }
+ get;
+ internal set;
}
-
- internal string _receiptHandle;
- internal string _sourceHost;
-
- internal int _deliveryAttempt;
- public int DeliveryAttempt
- {
- get { return _deliveryAttempt; }
- }
-
- internal bool _bodyChecksumVerified = true;
-
- private string topic;
-
+
public string Topic
{
- get { return topic; }
- set { this.topic = value; }
+ get;
+ set;
}
- private byte[] body;
public byte[] Body
{
- get { return body; }
- set { this.body = value; }
+ get;
+ set;
}
- private string tag;
public string Tag
{
- get { return tag; }
- set { this.tag = value; }
+ get;
+ set;
}
- private List<string> keys;
public List<string> Keys
{
- get { return keys; }
- set { this.keys = value; }
+ get;
+ set;
}
- private Dictionary<string, string> userProperties;
public Dictionary<string, string> UserProperties
{
- get { return userProperties; }
- set { this.userProperties = value; }
+ get;
+ set;
}
- private Dictionary<string, string> systemProperties;
- internal Dictionary<string, string> SystemProperties
+ public int MaxAttemptTimes
{
- get { return systemProperties; }
- set { this.systemProperties = value; }
+ get;
+ set;
}
- private int maxAttemptTimes;
- public int MaxAttemptTimes
+ private DateTime _deliveryTimestamp = DateTime.MinValue;
+
+ public DateTime DeliveryTimestamp
+ {
+ get => _deliveryTimestamp;
+ set => _deliveryTimestamp = value;
+ }
+
+ public int DeliveryAttempt
{
- get { return maxAttemptTimes; }
- set { maxAttemptTimes = value; }
+ get;
+ internal set;
}
+
+ public string MessageGroup
+ {
+ get;
+ set;
+ }
+
+ internal bool _checksumVerifiedOk = true;
+ internal string _receiptHandle;
+ internal string _sourceHost;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 452f699..60a0979 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -21,6 +21,7 @@ using rmq = Apache.Rocketmq.V2;
using System.Collections.Generic;
using System.Collections.Concurrent;
using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using NLog;
@@ -30,7 +31,7 @@ namespace Org.Apache.Rocketmq
{
public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
{
- this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+ _loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
}
public override async Task Start()
@@ -54,7 +55,7 @@ namespace Org.Apache.Rocketmq
public async Task<SendReceipt> Send(Message message)
{
- if (!loadBalancer.ContainsKey(message.Topic))
+ if (!_loadBalancer.ContainsKey(message.Topic))
{
var topicRouteData = await GetRouteFor(message.Topic, false);
if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
@@ -64,10 +65,10 @@ namespace Org.Apache.Rocketmq
}
var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
- loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+ _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
}
- var publishLB = loadBalancer[message.Topic];
+ var publishLb = _loadBalancer[message.Topic];
var request = new rmq::SendMessageRequest();
var entry = new rmq::Message();
@@ -85,6 +86,17 @@ namespace Org.Apache.Rocketmq
entry.SystemProperties = new rmq::SystemProperties();
entry.SystemProperties.MessageId = message.MessageId;
+ entry.SystemProperties.MessageType = rmq::MessageType.Normal;
+ if (DateTime.MinValue != message.DeliveryTimestamp)
+ {
+ entry.SystemProperties.MessageType = rmq::MessageType.Delay;
+ entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
+ } else if (!String.IsNullOrEmpty(message.MessageGroup))
+ {
+ entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
+ entry.SystemProperties.MessageGroup = message.MessageGroup;
+ }
+
if (!string.IsNullOrEmpty(message.Tag))
{
entry.SystemProperties.Tag = message.Tag;
@@ -98,9 +110,8 @@ namespace Org.Apache.Rocketmq
}
}
- // string target = "https://";
List<string> targets = new List<string>();
- List<rmq::MessageQueue> candidates = publishLB.select(message.MaxAttemptTimes);
+ List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
foreach (var messageQueue in candidates)
{
targets.Add(Utilities.TargetUrl(messageQueue));
@@ -140,7 +151,6 @@ namespace Org.Apache.Rocketmq
throw new Exception("Send message failed");
}
- private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
- private static new readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+ private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PublishLoadBalancer.cs b/rocketmq-client-csharp/PublishLoadBalancer.cs
index 5410b5a..7d258b4 100644
--- a/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -42,10 +42,10 @@ namespace Org.Apache.Rocketmq
this._messageQueues.Sort(Utilities.CompareMessageQueue);
Random random = new Random();
- this.roundRobinIndex = random.Next(0, this._messageQueues.Count);
+ this._roundRobinIndex = random.Next(0, this._messageQueues.Count);
}
- public void update(TopicRouteData route)
+ public void Update(TopicRouteData route)
{
List<rmq::MessageQueue> partitions = new List<rmq::MessageQueue>();
foreach (var partition in route.MessageQueues)
@@ -68,7 +68,7 @@ namespace Org.Apache.Rocketmq
/**
* Accept a partition iff its broker is different.
*/
- private bool accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
+ private bool Accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
{
if (0 == existing.Count)
{
@@ -85,7 +85,7 @@ namespace Org.Apache.Rocketmq
return true;
}
- public List<rmq::MessageQueue> select(int maxAttemptTimes)
+ public List<rmq::MessageQueue> Select(int maxAttemptTimes)
{
List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
@@ -94,13 +94,13 @@ namespace Org.Apache.Rocketmq
{
return result;
}
- int start = ++roundRobinIndex;
+ int start = ++_roundRobinIndex;
int found = 0;
for (int i = 0; i < all.Count; i++)
{
int idx = ((start + i) & int.MaxValue) % all.Count;
- if (accept(result, all[idx]))
+ if (Accept(result, all[idx]))
{
result.Add(all[idx]);
if (++found >= maxAttemptTimes)
@@ -115,6 +115,6 @@ namespace Org.Apache.Rocketmq
private List<rmq::MessageQueue> _messageQueues;
- private int roundRobinIndex;
+ private int _roundRobinIndex;
}
}
\ No newline at end of file
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index faf0f45..53f2ce1 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -38,11 +38,14 @@ namespace Org.Apache.Rocketmq
}
[TestMethod]
- public async Task testSendMessage()
+ public async Task TestSendMessage()
{
- var accessPoint = new AccessPoint();
- accessPoint.Host = host;
- accessPoint.Port = port;
+ var accessPoint = new AccessPoint
+ {
+ Host = HOST,
+ Port = PORT
+ };
+
var producer = new Producer(accessPoint, resourceNamespace);
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
@@ -60,8 +63,8 @@ namespace Org.Apache.Rocketmq
private static string topic = "cpp_sdk_standard";
private static ICredentialsProvider credentialsProvider;
- private static string host = "11.166.42.94";
- private static int port = 8081;
+ private static string HOST = "127.0.0.1";
+ private static int PORT = 8081;
}
}
\ No newline at end of file