You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/27 06:57:17 UTC
[rocketmq-client-csharp] branch observability updated: Add unit tests for Producer.Send
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/observability by this push:
new 9ba3dc7 Add unit tests for Producer.Send
9ba3dc7 is described below
commit 9ba3dc7f4352a17384823ac634bc413f6798df3c
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jun 27 14:57:08 2022 +0800
Add unit tests for Producer.Send
---
rocketmq-client-csharp/ClientManager.cs | 10 +--
rocketmq-client-csharp/Message.cs | 17 ++++-
rocketmq-client-csharp/MessageException.cs | 29 ++++++++
rocketmq-client-csharp/Producer.cs | 6 ++
tests/ProducerTest.cs | 112 ++++++++++++++++++++++++++---
5 files changed, 155 insertions(+), 19 deletions(-)
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index a79cbe3..0b03886 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -84,15 +84,15 @@ namespace Org.Apache.Rocketmq
rmq::QueryRouteRequest request, TimeSpan timeout)
{
var rpcClient = GetRpcClient(target);
- Logger.Debug($"QueryRouteRequest: {request.ToString()}");
+ Logger.Debug($"QueryRouteRequest: {request}");
var queryRouteResponse = await rpcClient.QueryRoute(metadata, request, timeout);
if (queryRouteResponse.Status.Code != rmq::Code.Ok)
{
- Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status.ToString()}");
+ Logger.Warn($"Failed to query route entries for topic={request.Topic.Name} from {target}: {queryRouteResponse.Status}");
// Raise an application layer exception
}
- Logger.Debug($"QueryRouteResponse: {queryRouteResponse.ToString()}");
+ Logger.Debug($"QueryRouteResponse: {queryRouteResponse}");
var messageQueues = new List<rmq::MessageQueue>();
foreach (var messageQueue in queryRouteResponse.MessageQueues)
@@ -107,9 +107,9 @@ namespace Org.Apache.Rocketmq
TimeSpan timeout)
{
var rpcClient = GetRpcClient(target);
- Logger.Debug($"Heartbeat to {target}, Request: {request.ToString()}");
+ Logger.Debug($"Heartbeat to {target}, Request: {request}");
var response = await rpcClient.Heartbeat(metadata, request, timeout);
- Logger.Debug($"Heartbeat to {target} response status: {response.Status.ToString()}");
+ Logger.Debug($"Heartbeat to {target} response status: {response.Status}");
return response.Status.Code == rmq::Code.Ok;
}
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 2671a9d..b527311 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+
namespace Org.Apache.Rocketmq
{
@@ -41,6 +42,7 @@ namespace Org.Apache.Rocketmq
Keys = keys;
Body = body;
UserProperties = new Dictionary<string, string>();
+ DeliveryTimestamp = DateTime.MinValue;
}
public string MessageId
@@ -85,12 +87,11 @@ namespace Org.Apache.Rocketmq
set;
}
- private DateTime _deliveryTimestamp = DateTime.MinValue;
public DateTime DeliveryTimestamp
{
- get => _deliveryTimestamp;
- set => _deliveryTimestamp = value;
+ get;
+ set;
}
public int DeliveryAttempt
@@ -104,6 +105,16 @@ namespace Org.Apache.Rocketmq
get;
set;
}
+
+ public bool Fifo()
+ {
+ return !String.IsNullOrEmpty(MessageGroup);
+ }
+
+ public bool Scheduled()
+ {
+ return DeliveryTimestamp > DateTime.UtcNow;
+ }
internal bool _checksumVerifiedOk = true;
internal string _receiptHandle;
diff --git a/rocketmq-client-csharp/MessageException.cs b/rocketmq-client-csharp/MessageException.cs
new file mode 100644
index 0000000..7ef10df
--- /dev/null
+++ b/rocketmq-client-csharp/MessageException.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Org.Apache.Rocketmq
+{
+ [Serializable]
+ public class MessageException : Exception
+ {
+ public MessageException(string message) : base(message)
+ {
+ }
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 60a0979..40d62a0 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -91,6 +91,12 @@ namespace Org.Apache.Rocketmq
{
entry.SystemProperties.MessageType = rmq::MessageType.Delay;
entry.SystemProperties.DeliveryTimestamp = Timestamp.FromDateTime(message.DeliveryTimestamp);
+
+ if (message.Fifo())
+ {
+ Logger.Warn("A message may not be FIFO and delayed at the same time");
+ throw new MessageException("A message may not be both FIFO and Timed");
+ }
} else if (!String.IsNullOrEmpty(message.MessageGroup))
{
entry.SystemProperties.MessageType = rmq::MessageType.Fifo;
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index 53f2ce1..13d1712 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -16,53 +16,143 @@
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
+using Org.Apache.Rocketmq;
-namespace Org.Apache.Rocketmq
-{
+namespace tests
+{
[TestClass]
public class ProducerTest
{
+ private static AccessPoint _accessPoint;
+
[ClassInitialize]
public static void SetUp(TestContext context)
{
- credentialsProvider = new ConfigFileCredentialsProvider();
+ _accessPoint = new AccessPoint
+ {
+ Host = HOST,
+ Port = PORT
+ };
}
[ClassCleanup]
public static void TearDown()
{
+ }
+ [TestMethod]
+ public async Task TestLifecycle()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ await producer.Shutdown();
}
[TestMethod]
- public async Task TestSendMessage()
+ public async Task TestSendStandardMessage()
{
- var accessPoint = new AccessPoint
- {
- Host = HOST,
- Port = PORT
- };
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ var msg = new Message(topic, body);
+
+ // Tag the massage. A message has at most one tag.
+ msg.Tag = "Tag-0";
+
+ // Associate the message with one or multiple keys
+ var keys = new List<string>();
+ keys.Add("k1");
+ keys.Add("k2");
+ msg.Keys = keys;
+
+ var sendResult = await producer.Send(msg);
+ Assert.IsNotNull(sendResult);
+ await producer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestSendFifoMessage()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ var msg = new Message(topic, body);
- var producer = new Producer(accessPoint, resourceNamespace);
+ // Messages of the same group will get delivered one after another.
+ msg.MessageGroup = "message-group-0";
+
+ // Verify messages are FIFO iff their message group is not null or empty.
+ Assert.IsTrue(msg.Fifo());
+
+ var sendResult = await producer.Send(msg);
+ Assert.IsNotNull(sendResult);
+ await producer.Shutdown();
+ }
+
+ [TestMethod]
+ public async Task TestSendScheduledMessage()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
producer.CredentialsProvider = new ConfigFileCredentialsProvider();
producer.Region = "cn-hangzhou-pre";
await producer.Start();
byte[] body = new byte[1024];
Array.Fill(body, (byte)'x');
var msg = new Message(topic, body);
+
+ msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ Assert.IsTrue(msg.Scheduled());
+
var sendResult = await producer.Send(msg);
Assert.IsNotNull(sendResult);
await producer.Shutdown();
}
+
+
+ /**
+ * Trying send a message that is both FIFO and Scheduled should fail.
+ */
+ [TestMethod]
+ public async Task TestSendMessage_Failure()
+ {
+ var producer = new Producer(_accessPoint, resourceNamespace);
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ await producer.Start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ var msg = new Message(topic, body);
+ msg.MessageGroup = "Group-0";
+ msg.DeliveryTimestamp = DateTime.UtcNow + TimeSpan.FromSeconds(10);
+ Assert.IsTrue(msg.Scheduled());
+
+ try
+ {
+ await producer.Send(msg);
+ Assert.Fail("Should have raised an exception");
+ }
+ catch (MessageException e)
+ {
+ }
+ await producer.Shutdown();
+ }
private static string resourceNamespace = "";
private static string topic = "cpp_sdk_standard";
- private static ICredentialsProvider credentialsProvider;
private static string HOST = "127.0.0.1";
private static int PORT = 8081;
}