You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/27 06:23:53 UTC

[rocketmq-client-csharp] branch observability updated: Adjust message

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/observability by this push:
     new 32f21f6  Adjust message
32f21f6 is described below

commit 32f21f65c583cf8723670a71b2834dcf7dddf7c2
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jun 27 14:23:44 2022 +0800

    Adjust message
---
 rocketmq-client-csharp/ClientManager.cs       | 10 +--
 rocketmq-client-csharp/Message.cs             | 91 +++++++++++++--------------
 rocketmq-client-csharp/Producer.cs            | 26 +++++---
 rocketmq-client-csharp/PublishLoadBalancer.cs | 14 ++---
 tests/ProducerTest.cs                         | 15 +++--
 5 files changed, 84 insertions(+), 72 deletions(-)

diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 1168799..a79cbe3 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -219,7 +219,7 @@ namespace Org.Apache.Rocketmq
         {
             var msg = new Message();
             msg.Topic = message.Topic.Name;
-            msg.messageId = message.SystemProperties.MessageId;
+            msg.MessageId = message.SystemProperties.MessageId;
             msg.Tag = message.SystemProperties.Tag;
 
             // Validate message body checksum
@@ -229,7 +229,7 @@ namespace Org.Apache.Rocketmq
                 uint checksum = Force.Crc32.Crc32Algorithm.Compute(raw, 0, raw.Length);
                 if (!message.SystemProperties.BodyDigest.Checksum.Equals(checksum.ToString("X")))
                 {
-                    msg._bodyChecksumVerified = false;
+                    msg._checksumVerifiedOk = false;
                 }
             }
             else if (rmq::DigestType.Md5 == message.SystemProperties.BodyDigest.Type)
@@ -237,7 +237,7 @@ namespace Org.Apache.Rocketmq
                 var checksum = MD5.HashData(raw);
                 if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
                 {
-                    msg._bodyChecksumVerified = false;
+                    msg._checksumVerifiedOk = false;
                 }
             }
             else if (rmq::DigestType.Sha1 == message.SystemProperties.BodyDigest.Type)
@@ -245,7 +245,7 @@ namespace Org.Apache.Rocketmq
                 var checksum = SHA1.HashData(raw);
                 if (!message.SystemProperties.BodyDigest.Checksum.Equals(System.Convert.ToHexString(checksum)))
                 {
-                    msg._bodyChecksumVerified = false;
+                    msg._checksumVerifiedOk = false;
                 }
             }
 
@@ -262,7 +262,7 @@ namespace Org.Apache.Rocketmq
                 msg.Keys.Add(key);
             }
 
-            msg._deliveryAttempt = message.SystemProperties.DeliveryAttempt;
+            msg.DeliveryAttempt = message.SystemProperties.DeliveryAttempt;
 
             if (message.SystemProperties.BodyEncoding == rmq::Encoding.Gzip)
             {
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index b8b0e98..2671a9d 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using System.Collections.Generic;
 namespace Org.Apache.Rocketmq
 {
@@ -33,82 +34,80 @@ namespace Org.Apache.Rocketmq
 
         public Message(string topic, string tag, List<string> keys, byte[] body)
         {
-            this.messageId = SequenceGenerator.Instance.Next();
-            this.maxAttemptTimes = 3;
-            this.topic = topic;
-            this.tag = tag;
-            this.keys = keys;
-            this.body = body;
-            this.userProperties = new Dictionary<string, string>();
-            this.systemProperties = new Dictionary<string, string>();
+            MessageId = SequenceGenerator.Instance.Next();
+            MaxAttemptTimes = 3;
+            Topic = topic;
+            Tag = tag;
+            Keys = keys;
+            Body = body;
+            UserProperties = new Dictionary<string, string>();
         }
 
-        internal string messageId;
         public string MessageId
         {
-            get { return messageId; }
+            get;
+            internal set;
         }
-
-        internal string _receiptHandle;
-        internal string _sourceHost;
-
-        internal int _deliveryAttempt;
-        public int DeliveryAttempt
-        {
-            get { return _deliveryAttempt; }
-        }
-
-        internal bool _bodyChecksumVerified = true;
-
-        private string topic;
-
+        
         public string Topic
         {
-            get { return topic; }
-            set { this.topic = value; }
+            get;
+            set;
         }
 
-        private byte[] body;
         public byte[] Body
         {
-            get { return body; }
-            set { this.body = value; }
+            get;
+            set;
         }
 
-        private string tag;
         public string Tag
         {
-            get { return tag; }
-            set { this.tag = value; }
+            get;
+            set;
         }
 
-        private List<string> keys;
         public List<string> Keys
         {
-            get { return keys; }
-            set { this.keys = value; }
+            get;
+            set;
         }
 
-        private Dictionary<string, string> userProperties;
         public Dictionary<string, string> UserProperties
         {
-            get { return userProperties; }
-            set { this.userProperties = value; }
+            get;
+            set;
         }
 
-        private Dictionary<string, string> systemProperties;
-        internal Dictionary<string, string> SystemProperties
+        public int MaxAttemptTimes
         {
-            get { return systemProperties; }
-            set { this.systemProperties = value; }
+            get;
+            set;
         }
 
-        private int maxAttemptTimes;
-        public int MaxAttemptTimes
+        private DateTime _deliveryTimestamp = DateTime.MinValue;
+
+        public DateTime DeliveryTimestamp
+        {
+            get => _deliveryTimestamp;
+            set => _deliveryTimestamp = value;
+        }
+        
+        public int DeliveryAttempt
         {
-            get { return maxAttemptTimes; }
-            set { maxAttemptTimes = value; }
+            get;
+            internal set;
         }
+        
+        public string MessageGroup
+        {
+            get;
+            set;
+        }
+
+        internal bool _checksumVerifiedOk = true;
+        internal string _receiptHandle;
+        internal string _sourceHost;
     }
 
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 452f699..60a0979 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -21,6 +21,7 @@ using rmq = Apache.Rocketmq.V2;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
 using Grpc.Core;
 using NLog;
 
@@ -30,7 +31,7 @@ namespace Org.Apache.Rocketmq
     {
         public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
         {
-            this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+            _loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
         }
 
         public override async Task Start()
@@ -54,7 +55,7 @@ namespace Org.Apache.Rocketmq
 
         public async Task<SendReceipt> Send(Message message)
         {
-            if (!loadBalancer.ContainsKey(message.Topic))
+            if (!_loadBalancer.ContainsKey(message.Topic))
             {
                 var topicRouteData = await GetRouteFor(message.Topic, false);
                 if (null == topicRouteData || null == topicRouteData.MessageQueues || 0 == topicRouteData.MessageQueues.Count)
@@ -64,10 +65,10 @@ namespace Org.Apache.Rocketmq
                 }
 
                 var loadBalancerItem = new PublishLoadBalancer(topicRouteData);
-                loadBalancer.TryAdd(message.Topic, loadBalancerItem);
+                _loadBalancer.TryAdd(message.Topic, loadBalancerItem);
             }
 
-            var publishLB = loadBalancer[message.Topic];
+            var publishLb = _loadBalancer[message.Topic];
 
             var request = new rmq::SendMessageRequest();
             var entry = new rmq::Message();
@@ -85,6 +86,17 @@ namespace Org.Apache.Rocketmq
 
             entry.SystemProperties = new rmq::SystemProperties();
             entry.SystemProperties.MessageId = message.MessageId;
+            entry.SystemProperties.MessageType = rmq::MessageType.Normal;
+            if (DateTime.MinValue != message.DeliveryTimestamp)
+            {
+                entry.SystemProperties.MessageType = rmq::MessageType.Delay;
+                entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
+            } else if (!String.IsNullOrEmpty(message.MessageGroup))
+            {
+                entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
+                entry.SystemProperties.MessageGroup = message.MessageGroup;
+            }
+            
             if (!string.IsNullOrEmpty(message.Tag))
             {
                 entry.SystemProperties.Tag = message.Tag;
@@ -98,9 +110,8 @@ namespace Org.Apache.Rocketmq
                 }
             }
 
-            // string target = "https://";
             List<string> targets = new List<string>();
-            List<rmq::MessageQueue> candidates = publishLB.select(message.MaxAttemptTimes);
+            List<rmq::MessageQueue> candidates = publishLb.Select(message.MaxAttemptTimes);
             foreach (var messageQueue in candidates)
             {
                 targets.Add(Utilities.TargetUrl(messageQueue));
@@ -140,7 +151,6 @@ namespace Org.Apache.Rocketmq
             throw new Exception("Send message failed");
         }
 
-        private ConcurrentDictionary<string, PublishLoadBalancer> loadBalancer;
-        private static new readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+        private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/PublishLoadBalancer.cs b/rocketmq-client-csharp/PublishLoadBalancer.cs
index 5410b5a..7d258b4 100644
--- a/rocketmq-client-csharp/PublishLoadBalancer.cs
+++ b/rocketmq-client-csharp/PublishLoadBalancer.cs
@@ -42,10 +42,10 @@ namespace Org.Apache.Rocketmq
 
             this._messageQueues.Sort(Utilities.CompareMessageQueue);
             Random random = new Random();
-            this.roundRobinIndex = random.Next(0, this._messageQueues.Count);
+            this._roundRobinIndex = random.Next(0, this._messageQueues.Count);
         }
 
-        public void update(TopicRouteData route)
+        public void Update(TopicRouteData route)
         {
             List<rmq::MessageQueue> partitions = new List<rmq::MessageQueue>();
             foreach (var partition in route.MessageQueues)
@@ -68,7 +68,7 @@ namespace Org.Apache.Rocketmq
         /**
          * Accept a partition iff its broker is different.
          */
-        private bool accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
+        private bool Accept(List<rmq::MessageQueue> existing, rmq::MessageQueue messageQueue)
         {
             if (0 == existing.Count)
             {
@@ -85,7 +85,7 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
-        public List<rmq::MessageQueue> select(int maxAttemptTimes)
+        public List<rmq::MessageQueue> Select(int maxAttemptTimes)
         {
             List<rmq::MessageQueue> result = new List<rmq::MessageQueue>();
 
@@ -94,13 +94,13 @@ namespace Org.Apache.Rocketmq
             {
                 return result;
             }
-            int start = ++roundRobinIndex;
+            int start = ++_roundRobinIndex;
             int found = 0;
 
             for (int i = 0; i < all.Count; i++)
             {
                 int idx = ((start + i) & int.MaxValue) % all.Count;
-                if (accept(result, all[idx]))
+                if (Accept(result, all[idx]))
                 {
                     result.Add(all[idx]);
                     if (++found >= maxAttemptTimes)
@@ -115,6 +115,6 @@ namespace Org.Apache.Rocketmq
 
         private List<rmq::MessageQueue> _messageQueues;
 
-        private int roundRobinIndex;
+        private int _roundRobinIndex;
     }
 }
\ No newline at end of file
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index faf0f45..53f2ce1 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -38,11 +38,14 @@ namespace Org.Apache.Rocketmq
         }
 
         [TestMethod]
-        public async Task testSendMessage()
+        public async Task TestSendMessage()
         {
-            var accessPoint = new AccessPoint();
-            accessPoint.Host = host;
-            accessPoint.Port = port;
+            var accessPoint = new AccessPoint
+            {
+                Host = HOST,
+                Port = PORT
+            };
+            
             var producer = new Producer(accessPoint, resourceNamespace);
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
@@ -60,8 +63,8 @@ namespace Org.Apache.Rocketmq
         private static string topic = "cpp_sdk_standard";
 
         private static ICredentialsProvider credentialsProvider;
-        private static string host = "11.166.42.94";
-        private static int port = 8081;
+        private static string HOST = "127.0.0.1";
+        private static int PORT = 8081;
     }
 
 }
\ No newline at end of file