You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/27 06:57:17 UTC

[rocketmq-client-csharp] branch observability updated: Add unit tests for Producer.Send

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/observability by this push:
     new 9ba3dc7  Add unit tests for Producer.Send
9ba3dc7 is described below

commit 9ba3dc7f4352a17384823ac634bc413f6798df3c
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jun 27 14:57:08 2022 +0800

    Add unit tests for Producer.Send
---
 rocketmq-client-csharp/ClientManager.cs    |  10 +--
 rocketmq-client-csharp/Message.cs          |  17 ++++-
 rocketmq-client-csharp/MessageException.cs |  29 ++++++++
 rocketmq-client-csharp/Producer.cs         |   6 ++
 tests/ProducerTest.cs                      | 112 ++++++++++++++++++++++++++---
 5 files changed, 155 insertions(+), 19 deletions(-)

diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index a79cbe3..0b03886 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -84,15 +84,15 @@ namespace Org.Apache.Rocketmq
             rmq::QueryRouteRequest request, TimeSpan timeout)
         {
             var rpcClient = GetRpcClient(target);
-            Logger.Debug($"QueryRouteRequest: {request.ToString()}");
+            Logger.Debug($"QueryRouteRequest: {request}");
             var queryRouteResponse = await rpcClient.QueryRoute(metadata, request, timeout);
 
             if (queryRouteResponse.Status.Code != rmq::Code.Ok)
             {
-                Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status.ToString()}");
+                Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status}");
                 // Raise an application layer exception
             }
-            Logger.Debug($"QueryRouteResponse: {queryRouteResponse.ToString()}");
+            Logger.Debug($"QueryRouteResponse: {queryRouteResponse}");
 
             var messageQueues = new List<rmq::MessageQueue>();
             foreach (var messageQueue in queryRouteResponse.MessageQueues)
@@ -107,9 +107,9 @@ namespace Org.Apache.Rocketmq
             TimeSpan timeout)
         {
             var rpcClient = GetRpcClient(target);
-            Logger.Debug($"Heartbeat to {target}, Request: {request.ToString()}");
+            Logger.Debug($"Heartbeat to {target}, Request: {request}");
             var response = await rpcClient.Heartbeat(metadata, request, timeout);
-            Logger.Debug($"Heartbeat to {target} response status: {response.Status.ToString()}");
+            Logger.Debug($"Heartbeat to {target} response status: {response.Status}");
             return response.Status.Code == rmq::Code.Ok;
         }
 
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 2671a9d..b527311 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+
 namespace Org.Apache.Rocketmq
 {
 
@@ -41,6 +42,7 @@ namespace Org.Apache.Rocketmq
             Keys = keys;
             Body = body;
             UserProperties = new Dictionary<string, string>();
+            DeliveryTimestamp = DateTime.MinValue;
         }
 
         public string MessageId
@@ -85,12 +87,11 @@ namespace Org.Apache.Rocketmq
             set;
         }
 
-        private DateTime _deliveryTimestamp = DateTime.MinValue;
 
         public DateTime DeliveryTimestamp
         {
-            get => _deliveryTimestamp;
-            set => _deliveryTimestamp = value;
+            get;
+            set;
         }
         
         public int DeliveryAttempt
@@ -104,6 +105,16 @@ namespace Org.Apache.Rocketmq
             get;
             set;
         }
+        
+        public bool Fifo()
+        {
+            return !String.IsNullOrEmpty(MessageGroup);
+        }
+
+        public bool Scheduled()
+        {
+            return DeliveryTimestamp > DateTime.UtcNow;
+        }
 
         internal bool _checksumVerifiedOk = true;
         internal string _receiptHandle;
diff --git a/rocketmq-client-csharp/MessageException.cs b/rocketmq-client-csharp/MessageException.cs
new file mode 100644
index 0000000..7ef10df
--- /dev/null
+++ b/rocketmq-client-csharp/MessageException.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Rocketmq
+{
+    [Serializable]
+    public class MessageException : Exception
+    {
+        public MessageException(string message) : base(message)
+        {
+        }
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 60a0979..40d62a0 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -91,6 +91,12 @@ namespace Org.Apache.Rocketmq
             {
                 entry.SystemProperties.MessageType = rmq::MessageType.Delay;
                 entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
+
+                if (message.Fifo())
+                {
+                    Logger.Warn("A message may not be FIFO and delayed at the same time");
+                    throw new MessageException("A message may not be both FIFO and Timed");
+                }
             } else if (!String.IsNullOrEmpty(message.MessageGroup))
             {
                 entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index 53f2ce1..13d1712 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -16,53 +16,143 @@
  */
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using System;
+using System.Collections.Generic;
 using System.Threading.Tasks;
+using Org.Apache.Rocketmq;
 
-namespace Org.Apache.Rocketmq
-{
 
+namespace tests
+{
     [TestClass]
     public class ProducerTest
     {
 
+        private static AccessPoint _accessPoint;
+
         [ClassInitialize]
         public static void SetUp(TestContext context)
         {
-            credentialsProvider = new ConfigFileCredentialsProvider();
+            _accessPoint = new AccessPoint
+            {
+                Host = HOST,
+                Port = PORT
+            };
         }
 
         [ClassCleanup]
         public static void TearDown()
         {
+        }
 
+        [TestMethod]
+        public async Task TestLifecycle()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            await producer.Shutdown();
         }
 
         [TestMethod]
-        public async Task TestSendMessage()
+        public async Task TestSendStandardMessage()
         {
-            var accessPoint = new AccessPoint
-            {
-                Host = HOST,
-                Port = PORT
-            };
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            
+            // Tag the massage. A message has at most one tag.
+            msg.Tag = "Tag-0";
+            
+            // Associate the message with one or multiple keys
+            var keys = new List<string>();
+            keys.Add("k1");
+            keys.Add("k2");
+            msg.Keys = keys;
+            
+            var sendResult = await producer.Send(msg);
+            Assert.IsNotNull(sendResult);
+            await producer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestSendFifoMessage()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
             
-            var producer = new Producer(accessPoint, resourceNamespace);
+            // Messages of the same group will get delivered one after another. 
+            msg.MessageGroup = "message-group-0";
+            
+            // Verify messages are FIFO iff their message group is not null or empty.
+            Assert.IsTrue(msg.Fifo());
+
+            var sendResult = await producer.Send(msg);
+            Assert.IsNotNull(sendResult);
+            await producer.Shutdown();
+        }
+        
+        [TestMethod]
+        public async Task TestSendScheduledMessage()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
             producer.CredentialsProvider = new ConfigFileCredentialsProvider();
             producer.Region = "cn-hangzhou-pre";
             await producer.Start();
             byte[] body = new byte[1024];
             Array.Fill(body, (byte)'x');
             var msg = new Message(topic, body);
+            
+            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            Assert.IsTrue(msg.Scheduled());
+            
             var sendResult = await producer.Send(msg);
             Assert.IsNotNull(sendResult);
             await producer.Shutdown();
         }
+        
+        
+        /**
+         * Trying send a message that is both FIFO and Scheduled should fail.
+         */
+        [TestMethod]
+        public async Task TestSendMessage_Failure()
+        {
+            var producer = new Producer(_accessPoint, resourceNamespace);
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            await producer.Start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            msg.MessageGroup = "Group-0";
+            msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+            Assert.IsTrue(msg.Scheduled());
+
+            try
+            {
+                await producer.Send(msg);
+                Assert.Fail("Should have raised an exception");
+            }
+            catch (MessageException e)
+            {
+            }
+            await producer.Shutdown();
+        }
 
         private static string resourceNamespace = "";
 
         private static string topic = "cpp_sdk_standard";
 
-        private static ICredentialsProvider credentialsProvider;
         private static string HOST = "127.0.0.1";
         private static int PORT = 8081;
     }