You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/23 08:15:11 UTC
[rocketmq-e2e] branch master updated: add csharp e2e test case (#12)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-e2e.git
The following commit(s) were added to refs/heads/master by this push:
new 4e08da1 add csharp e2e test case (#12)
4e08da1 is described below
commit 4e08da19a4d68683194bcadfe44d35cddeab68f7
Author: rook1ewang <16...@users.noreply.github.com>
AuthorDate: Thu Mar 23 16:15:04 2023 +0800
add csharp e2e test case (#12)
Co-authored-by: ww269266 <ww...@alibaba-inc.com>
---
csharp/.gitignore | 3 +
csharp/README.md | 50 ++++
csharp/bin/run.sh | 24 ++
csharp/rocketmq-client-csharp-tests.sln | 34 +++
.../rocketmq-client-csharp-tests.csproj | 25 ++
.../rocketmq-client-csharp-tests/test/BaseTest.cs | 32 +++
.../test/DelayMsgTest.cs | 308 +++++++++++++++++++++
.../test/FifoMsgTest.cs | 191 +++++++++++++
.../test/NormalMsgTest.cs | 177 ++++++++++++
.../test/TransMsgTest.cs | 305 ++++++++++++++++++++
.../utils/ClientUtils.cs | 46 +++
.../utils/MQAdminUtils.cs | 180 ++++++++++++
.../utils/NameUtils.cs | 48 ++++
13 files changed, 1423 insertions(+)
diff --git a/csharp/.gitignore b/csharp/.gitignore
new file mode 100644
index 0000000..e9dd4ab
--- /dev/null
+++ b/csharp/.gitignore
@@ -0,0 +1,3 @@
+rocketmq-client-csharp-tests/bin
+rocketmq-client-csharp-tests/obj
+.idea
\ No newline at end of file
diff --git a/csharp/README.md b/csharp/README.md
new file mode 100644
index 0000000..bd7145f
--- /dev/null
+++ b/csharp/README.md
@@ -0,0 +1,50 @@
+
+## Apache RocketMQ E2E
+[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+
+RocketMQ Csharp E2E Test
+
+### Test Case Coverage
+* Message Type
+ * Normal message
+ * Transaction message
+ * Order message
+ * Delay message
+* Producer
+ * Sync Send
+ * Async Send
+* **PushConsumer (sdk not accomplished)**
+* SimpleConsumer
+ * Order/Delay/Transaction/Normal
+ * Sync receive/**Async receive (sdk not accomplished)**
+ * Sync ack/Async ack
+* Client init(Producer/SimpleConsumer/**PushConsumer (sdk not accomplished)**)
+ * Parameter settings
+* Model
+ * broadcast
+ * cluster
+* Message
+ * Tag
+ * Body
+ * Key
+ * User property
+* Filter
+ * Tag
+ * Sql
+* Retry
+ * Normal message
+ * Order message
+
+#### How to start
+```angular2html
+# nameserver、endpoint and broker 、clustername was from ENV ALL_IP,You can view the details in common/bin/env.sh
+# cd project and run csharp e2e test case
+cd csharp && sh bin/run.sh
+```
+##### Options
+* `ALL_IP` : required, set by GitHub actions
+* `cluster`: not required, default `DefaultCluster`
+
+#### how to replace csharp client sdk version
+```angular2html
+# cd rocketmq-client-csharp-tests && dotnet add package RocketMQ.Client --version 5.0.1
\ No newline at end of file
diff --git a/csharp/bin/run.sh b/csharp/bin/run.sh
new file mode 100644
index 0000000..96d6eed
--- /dev/null
+++ b/csharp/bin/run.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# cd project base dir to compile mqadmin utils for other language e2e test using
+cd ../common && mvn -Prelease -DskipTests clean package -U
+# set env for mqadmin (use source to set linux env variables in current shell)
+cd ../rocketmq-admintools && source bin/env.sh
+# run csharp e2e test case
+cd ../csharp/rocketmq-client-csharp-tests/ && dotnet test -l "console;verbosity=detailed"
diff --git a/csharp/rocketmq-client-csharp-tests.sln b/csharp/rocketmq-client-csharp-tests.sln
new file mode 100644
index 0000000..fb34cfa
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests.sln
@@ -0,0 +1,34 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.30114.105
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "rocketmq-client-csharp-tests", "rocketmq-client-csharp-tests\rocketmq-client-csharp-tests.csproj", "{9F749350-A3D0-423E-AFB6-79E521C777D0}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Debug|x64 = Debug|x64
+ Debug|x86 = Debug|x86
+ Release|Any CPU = Release|Any CPU
+ Release|x64 = Release|x64
+ Release|x86 = Release|x86
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x64.Build.0 = Debug|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x86.Build.0 = Debug|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x64.ActiveCfg = Release|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x64.Build.0 = Release|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x86.ActiveCfg = Release|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x86.Build.0 = Release|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ EndGlobalSection
+EndGlobal
diff --git a/csharp/rocketmq-client-csharp-tests/rocketmq-client-csharp-tests.csproj b/csharp/rocketmq-client-csharp-tests/rocketmq-client-csharp-tests.csproj
new file mode 100644
index 0000000..a75e880
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/rocketmq-client-csharp-tests.csproj
@@ -0,0 +1,25 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <TargetFramework>net7.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+
+ <IsPackable>false</IsPackable>
+ </PropertyGroup>
+
+ <ItemGroup>
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
+ <PackageReference Include="RocketMQ.Client" Version="5.1.0" />
+ <PackageReference Include="xunit" Version="2.4.1" />
+ <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ <PrivateAssets>all</PrivateAssets>
+ </PackageReference>
+ <PackageReference Include="coverlet.collector" Version="3.1.2">
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ <PrivateAssets>all</PrivateAssets>
+ </PackageReference>
+ </ItemGroup>
+
+</Project>
diff --git a/csharp/rocketmq-client-csharp-tests/test/BaseTest.cs b/csharp/rocketmq-client-csharp-tests/test/BaseTest.cs
new file mode 100644
index 0000000..04eac78
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/BaseTest.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Utils;
+
+namespace Rocketmq.Tests
+{
+ public class BaseTest
+ {
+ protected const int sendNum = 10;
+ protected string accessKey = string.IsNullOrEmpty(ClientUtils.ACCESS_KEY)? "":ClientUtils.ACCESS_KEY;
+ protected string secretKey = string.IsNullOrEmpty(ClientUtils.SECRET_KEY)? "":ClientUtils.SECRET_KEY;
+ protected string nameserver = ClientUtils.NAMESERVER;
+ protected string endpoints = ClientUtils.GRPC_ENDPOINT;
+ protected string clusterName = ClientUtils.CLUSTER_NAME;
+ protected string brokerAddr = ClientUtils.BROKER_ADDR;
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/DelayMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/DelayMsgTest.cs
new file mode 100644
index 0000000..15fabd8
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/DelayMsgTest.cs
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System;
+
+namespace Rocketmq.Tests
+
+{
+ public class DealyMsgTest : BaseTest
+ {
+ private readonly ITestOutputHelper _output;
+
+
+ public DealyMsgTest(ITestOutputHelper output)
+ {
+ _output = output;
+ }
+
+ [Fact]
+ public async Task TestSendDelayMsgSyncSimpleConsumerRecv()
+ {
+ List<string> sendMsgIdList = new List<string>();
+ List<string> recvMsgIdList = new List<string>();
+
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ MQAdminUtils.CreateDelayTopic(topic,null,clusterName,nameserver);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+
+ await simpleConsumer.Receive(32,TimeSpan.FromSeconds(15));
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var bytes = Encoding.UTF8.GetBytes(NameUtils.RandomString(8));
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys(NameUtils.RandomString(8))
+ .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(5))
+ .Build();
+
+ var sendReceipt = await producer.Send(message);
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ _output.WriteLine(
+ $"[{DateTime.UtcNow:HH:mm:ss.fff}] Send delay message: {message}, message id: {sendReceipt.MessageId}");
+ }
+
+ DateTime endTime = DateTime.Now.AddSeconds(30);
+ while (true)
+ {
+ if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ try
+ {
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ await simpleConsumer.Ack(message);
+ recvMsgIdList.Add(message.MessageId);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack delay message: {message}");
+ }
+
+ await Task.Delay(2000);
+ }
+ catch (Exception ex)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+ }
+ }
+
+ sendMsgIdList.Sort();
+ recvMsgIdList.Sort();
+ Assert.Equal(sendMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIdList);
+ }
+
+ [Fact]
+ public async Task TestSendDelayMsgSyncSimpleConsumerRecvTwice()
+ {
+ List<string> sendMsgIdList = new List<string>();
+ List<string> recvMsgIdList = new List<string>();
+
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+
+ MQAdminUtils.CreateDelayTopic(topic,null,clusterName,nameserver);
+
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await simpleConsumer.Receive(32,TimeSpan.FromSeconds(15));
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("keyA", "keyB")
+ .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(5))
+ .Build();
+
+ var sendReceipt = await producer.Send(message);
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ _output.WriteLine(
+ $"[{DateTime.UtcNow:HH:mm:ss.fff}] Send delay message: {message}, message id: {sendReceipt.MessageId}");
+ }
+
+ DateTime endTime = DateTime.Now.AddSeconds(60);
+ while (true)
+ {
+ if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ try
+ {
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ if (message.DeliveryAttempt == 2)
+ {
+ await simpleConsumer.Ack(message);
+ recvMsgIdList.Add(message.MessageId);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack delay message: {message}");
+ }
+ else
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack delay message: {message}");
+ }
+ }
+
+ await Task.Delay(2000);
+ }
+ catch (Exception ex)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+ }
+ }
+
+ sendMsgIdList.Sort();
+ recvMsgIdList.Sort();
+ Assert.Equal(sendMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIdList);
+ }
+
+ [Fact]
+ public async Task TestSendDelayMsgSyncSimpleConsumerRecvMore()
+ {
+ List<string> sendMsgIdList = new List<string>();
+ List<string> recvMsgIdList = new List<string>();
+
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+ MQAdminUtils.CreateDelayTopic(topic,null,clusterName,nameserver);
+
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } })
+ .Build();
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("keyA", "keyB")
+ .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(5))
+ .Build();
+
+ var sendReceipt = await producer.Send(message);
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ _output.WriteLine(
+ $"[{DateTime.UtcNow:HH:mm:ss.fff}] Send message: {message}, message id: {sendReceipt.MessageId}");
+ }
+
+ DateTime endTime = DateTime.UtcNow.AddSeconds(90);
+
+ while (true)
+ {
+ if (DateTime.UtcNow > endTime)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ try
+ {
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ if (message.DeliveryAttempt > 2)
+ {
+ await simpleConsumer.Ack(message);
+ _output.WriteLine(
+ $"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack message: {message}, message id: {message.MessageId}");
+ recvMsgIdList.Add(message.MessageId);
+ }
+ else
+ {
+ _output.WriteLine(
+ $"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack message: {message}, message id: {message.MessageId}");
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+ }
+ await Task.Delay(2000);
+ }
+
+ sendMsgIdList.Sort();
+ recvMsgIdList.Sort();
+ Assert.Equal(sendMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIdList);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/FifoMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/FifoMsgTest.cs
new file mode 100644
index 0000000..f70b1ce
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/FifoMsgTest.cs
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System;
+
+namespace Rocketmq.Tests
+{
+ public class FifoMsgTest : BaseTest
+ {
+ private readonly ITestOutputHelper _output;
+
+ public FifoMsgTest(ITestOutputHelper output)
+ {
+ _output = output;
+ }
+
+ [Fact]
+ public async Task TestSendFifoMsgSyncSimpleConsumerRecv()
+ {
+ List<string> sendMsgIdList = new List<string>();
+ List<string> recvMsgIdList = new List<string>();
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ MQAdminUtils.CreateFIFOTopic(topic, null, clusterName, nameserver);
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ const string messageGroup = "messageGroup1";
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("yourMessageKey-7044358f98fc")
+ .SetMessageGroup(messageGroup)
+ .Build();
+
+ var sendReceipt = await producer.Send(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send fifo message: {message}, message id: {sendReceipt.MessageId}");
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ }
+
+ DateTime endTime = DateTime.Now.AddSeconds(60);
+ while (true)
+ {
+ if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack fifo message: {message}");
+ await simpleConsumer.Ack(message);
+ recvMsgIdList.Add(message.MessageId);
+ }
+ }
+
+ Assert.Equal(sendMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIdList);
+ }
+
+
+ [Fact]
+ public async Task TestSendFifoMsgSyncSimpleConsumerRecvTwice()
+ {
+
+ List<string> sendMsgIdList = new List<string>();
+ List<string> recvMsgIdList = new List<string>();
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+ MQAdminUtils.CreateFIFOTopic(topic, null, clusterName, nameserver);
+
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ const string messageGroup = "messageGroup2";
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("yourMessageKey-7044358f98fc")
+ .SetMessageGroup(messageGroup)
+ .Build();
+
+ var sendReceipt = await producer.Send(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send fifo message: {message}, message id: {sendReceipt.MessageId}");
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ }
+
+ DateTime endTime = DateTime.Now.AddSeconds(30);
+ while (true)
+ {
+ if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ if (message.DeliveryAttempt == 2)
+ {
+ await simpleConsumer.Ack(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack fifo message: {message}");
+ recvMsgIdList.Add(message.MessageId);
+ }
+ else
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack fifo message: {message}");
+ }
+ }
+ await Task.Delay(2000);
+ }
+
+ Assert.Equal(sendMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIdList);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/NormalMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/NormalMsgTest.cs
new file mode 100644
index 0000000..2e41d04
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/NormalMsgTest.cs
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System.Collections.Generic;
+using System;
+using System.Threading.Tasks;
+
+namespace Rocketmq.Tests
+{
+ public class NormalMsgTest : BaseTest
+ {
+ private readonly ITestOutputHelper _output;
+
+ public NormalMsgTest(ITestOutputHelper output)
+ {
+ _output = output;
+ }
+
+ [Fact]
+ public async Task TestSendNormalMsgSyncSimpleConsumerRecv()
+ {
+ List<string> sendMsgIdList = new List<string>();
+ List<string> recvMsgIdList = new List<string>();
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+ MQAdminUtils.CreateTopic(topic, null, clusterName, nameserver);
+
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("yourMessageKey-7044358f98fc")
+ .Build();
+
+ var sendReceipt = await producer.Send(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send normal message: {message}, message id: {sendReceipt.MessageId}");
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ }
+
+ DateTime endTime = DateTime.Now.AddSeconds(60);
+ while (true)
+ {
+ if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ await simpleConsumer.Ack(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack normal message: {message}");
+ recvMsgIdList.Add(message.MessageId);
+ }
+ await Task.Delay(2000);
+ }
+ sendMsgIdList.Sort();
+ recvMsgIdList.Sort();
+ Assert.Equal(sendMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIdList);
+ }
+
+ [Fact]
+ public async Task TestSendNormalMsgSyncSimpleConsumerRecvRetry()
+ {
+ List<string> sendMsgIdList = new List<string>();
+ List<string> recvMsgIdList = new List<string>();
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+ MQAdminUtils.CreateTopic(topic, null, clusterName, nameserver);
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("yourMessageKey-7044358f98fc")
+ .Build();
+
+ var sendReceipt = await producer.Send(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send normal message: {message}, message id: {sendReceipt.MessageId}");
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ }
+
+ DateTime endTime = DateTime.UtcNow.AddSeconds(90);
+ while (true)
+ {
+ if (DateTime.UtcNow > endTime)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ _output.WriteLine(
+ $"[{DateTime.UtcNow:HH:mm:ss.fff}] recv normal msg: {message}, message id: {message.MessageId}");
+ }
+ await Task.Delay(2000);
+ }
+ }
+
+ // test send with not match message type
+
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/TransMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/TransMsgTest.cs
new file mode 100644
index 0000000..455380e
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/TransMsgTest.cs
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Rocketmq.Tests
+{
+ public class TransMsgTest : BaseTest
+ {
+
+ private readonly ITestOutputHelper _output;
+
+ public TransMsgTest(ITestOutputHelper output)
+ {
+ _output = output;
+ }
+
+ private class TransactionChecker : ITransactionChecker
+ {
+ private readonly ConcurrentBag<string> _sendMsgIds;
+ private readonly bool _isCommit;
+ private readonly ITestOutputHelper _output;
+
+ public TransactionChecker(ConcurrentBag<string> sendMsgIds, bool isCommit, ITestOutputHelper output)
+ {
+ _sendMsgIds = sendMsgIds;
+ _isCommit = isCommit;
+ _output = output;
+ }
+
+ public TransactionResolution Check(MessageView messageView)
+ {
+ _sendMsgIds.Add(messageView.MessageId);
+ var action = _isCommit ? "commit" : "rollback";
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Checker {action} trans msg, messageview={messageView}");
+ return _isCommit ? TransactionResolution.Commit : TransactionResolution.Rollback;
+ }
+ }
+
+ [Fact]
+ public async Task TestSendTransHalfCommitMsgSyncSimpleConsumerRecv()
+ {
+ ConcurrentBag<string> sendMsgIdList = new ConcurrentBag<string>();
+ List<string> recvMsgIdList = new List<string>();
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+
+ MQAdminUtils.CreateTransactionTopic(topic, null, clusterName, nameserver);
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .SetTransactionChecker(new TransactionChecker(sendMsgIdList, true, _output))
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var transaction = producer.BeginTransaction();
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("keyA", "keyB")
+ .Build();
+
+ var sendReceipt = await producer.Send(message, transaction);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send trans message: {message}, message id: {sendReceipt.MessageId}");
+ if (i % 2 == 0)
+ {
+ transaction.Commit();
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Commit trans msg, sendReceipt={sendReceipt}");
+ }
+ }
+
+ DateTime endTime = DateTime.Now.AddSeconds(120);
+ while (true)
+ {
+ if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ await simpleConsumer.Ack(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack trans message: {message}");
+ recvMsgIdList.Add(message.MessageId);
+ }
+ await Task.Delay(2000);
+ }
+
+ var sendMsgIds = sendMsgIdList.ToList();
+ sendMsgIds.Sort();
+ recvMsgIdList.Sort();
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIds);
+ }
+
+ [Fact]
+ public async Task TestSendTransCheckRollbackMsgSyncSimpleConsumerRecv()
+ {
+ ConcurrentBag<string> sendMsgIdList = new ConcurrentBag<string>();
+ List<string> recvMsgIdList = new List<string>();
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+
+ MQAdminUtils.CreateTransactionTopic(topic, null, clusterName, nameserver);
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .SetTransactionChecker(new TransactionChecker(sendMsgIdList, false, _output))
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var transaction = producer.BeginTransaction();
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("keyA", "keyB")
+ .Build();
+
+ var sendReceipt = await producer.Send(message, transaction);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send trans msg, sendReceipt={sendReceipt}");
+ // Commit the transaction.
+ if (i % 2 == 0)
+ {
+ transaction.Rollback();
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Rollback trans msg, sendReceipt={sendReceipt}");
+ }
+ }
+
+ DateTime endTime = DateTime.Now.AddSeconds(60);
+ while (true)
+ {
+ if (DateTime.Now > endTime)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ await simpleConsumer.Ack(message);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack trans message: {message}");
+ recvMsgIdList.Add(message.MessageId);
+ }
+ await Task.Delay(2000);
+ }
+ Assert.Equal(recvMsgIdList.Count, 0);
+ }
+
+
+ [Fact]
+ public async Task TestSendTransCheckRollbackMsgSyncSimpleConsumerRecvRetrys()
+ {
+ ConcurrentBag<string> sendMsgIdList = new ConcurrentBag<string>();
+ List<string> recvMsgIdList = new List<string>();
+ string topic = NameUtils.GetTopicName();
+ string consumerGroup = NameUtils.GetGroupName();
+ string tag = NameUtils.RandomString(8);
+
+ MQAdminUtils.CreateTransactionTopic(topic, null, clusterName, nameserver);
+ var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoints)
+ .SetCredentialsProvider(credentialsProvider)
+ .Build();
+
+ var subscription = new Dictionary<string, FilterExpression>
+ { { topic, new FilterExpression(tag) } };
+ await using var simpleConsumer = await new SimpleConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetAwaitDuration(TimeSpan.FromSeconds(15))
+ .SetSubscriptionExpression(subscription)
+ .Build();
+ await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+ await using var producer = await new Producer.Builder()
+ .SetTopics(topic)
+ .SetClientConfig(clientConfig)
+ .SetTransactionChecker(new TransactionChecker(sendMsgIdList, false, _output))
+ .Build();
+
+ for (int i = 0; i < sendNum; i++)
+ {
+ var transaction = producer.BeginTransaction();
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ var message = new Message.Builder()
+ .SetTopic(topic)
+ .SetBody(bytes)
+ .SetTag(tag)
+ .SetKeys("keyA", "keyB")
+ .Build();
+
+ var sendReceipt = await producer.Send(message, transaction);
+ // Commit the transaction.
+ transaction.Commit();
+ sendMsgIdList.Add(sendReceipt.MessageId);
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Commit trans msg, sendReceipt={sendReceipt}");
+ }
+
+ DateTime endTime = DateTime.UtcNow.AddSeconds(120);
+ while (true)
+ {
+ if (DateTime.UtcNow > endTime || recvMsgIdList.Count == sendNum)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+ break;
+ }
+
+ try
+ {
+ var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+ foreach (var message in messageViews)
+ {
+ if (message.DeliveryAttempt > 2)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack trans message: {message}");
+ await simpleConsumer.Ack(message);
+ recvMsgIdList.Add(message.MessageId);
+ }
+ else
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack trans msg trans message: {message}");
+ }
+ }
+ await Task.Delay(2000);
+ }
+ catch (Exception ex)
+ {
+ _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+ }
+ }
+ var sendMsgIds = sendMsgIdList.ToList();
+ sendMsgIds.Sort();
+ recvMsgIdList.Sort();
+ Assert.Equal(sendMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList.Count, sendNum);
+ Assert.Equal(recvMsgIdList, sendMsgIds);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/utils/ClientUtils.cs b/csharp/rocketmq-client-csharp-tests/utils/ClientUtils.cs
new file mode 100644
index 0000000..81a9d85
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/utils/ClientUtils.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Utils
+{
+ public class ClientUtils
+ {
+
+ public static readonly string NAMESERVER;
+ public static readonly string ACCESS_KEY;
+ public static readonly string SECRET_KEY;
+
+ public static readonly string GRPC_ENDPOINT;
+
+ public static readonly string BROKER_ADDR;
+
+ public static readonly string CLUSTER_NAME;
+
+ static ClientUtils()
+ {
+ NAMESERVER = Environment.GetEnvironmentVariable("NAMESERVER");
+ GRPC_ENDPOINT = Environment.GetEnvironmentVariable("GRPC_ENDPOINT");
+ CLUSTER_NAME = Environment.GetEnvironmentVariable("CLUSTER_NAME");
+ BROKER_ADDR = Environment.GetEnvironmentVariable("BROKER_ADDR");
+ ACCESS_KEY = Environment.GetEnvironmentVariable("ACCESS_KEY");
+ SECRET_KEY = Environment.GetEnvironmentVariable("SECRET_KEY");
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/utils/MQAdminUtils.cs b/csharp/rocketmq-client-csharp-tests/utils/MQAdminUtils.cs
new file mode 100644
index 0000000..523889a
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/utils/MQAdminUtils.cs
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.IO;
+
+namespace Utils
+{
+ public static class MQAdminUtils
+ {
+
+ private static string GetRootPath()
+ {
+ string projectBasePath = Environment.CurrentDirectory;
+ string path = Path.GetDirectoryName(projectBasePath);
+ path=Path.GetDirectoryName(path);
+ path=Path.GetDirectoryName(path);
+ path=Path.GetDirectoryName(path);
+ path=Path.GetDirectoryName(path);
+ return path;
+ }
+
+
+ // Execute a shell command and return its output as a string
+ public static string ExecuteShellCommand(string command)
+ {
+ var process = new Process()
+ {
+ StartInfo = new ProcessStartInfo
+ {
+ FileName = "/bin/bash", // Use cmd on Windows and bash on other platforms
+ Arguments = $"-c \"{command}\"", // Wrap the command with /c or -c flags
+ RedirectStandardOutput = true,
+ UseShellExecute = false,
+ CreateNoWindow = true,
+ }
+ };
+ process.Start();
+ string output = process.StandardOutput.ReadToEnd();
+ process.WaitForExit();
+ Console.WriteLine(output);
+ return output;
+ }
+
+
+ public static string CreateTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+ {
+ // use absolute path
+ string path = GetRootPath();
+ string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+ if (!string.IsNullOrEmpty(nameserver))
+ {
+ command += " -n " + nameserver;
+ }
+ if (!string.IsNullOrEmpty(brokerAddr))
+ {
+ command += " -b " + brokerAddr;
+ }
+ if (!string.IsNullOrEmpty(clusterName))
+ {
+ command += " -c " + clusterName;
+ }
+ Console.WriteLine(command);
+ return ExecuteShellCommand(command);
+ }
+
+ public static string CreateDelayTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+ {
+ // use absolute path
+ string path = GetRootPath();
+ string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+ if (!string.IsNullOrEmpty(nameserver))
+ {
+ command += " -n " + nameserver;
+ }
+ if (!string.IsNullOrEmpty(brokerAddr))
+ {
+ command += " -b " + brokerAddr;
+ }
+ if (!string.IsNullOrEmpty(clusterName))
+ {
+ command += " -c " + clusterName;
+ }
+ command += " -a " + "+message.type=DELAY";
+ return ExecuteShellCommand(command);
+ }
+
+ public static string CreateFIFOTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+ {
+ // use absolute path
+ string path = GetRootPath();
+ string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+ if (!string.IsNullOrEmpty(nameserver))
+ {
+ command += " -n " + nameserver;
+ }
+ if (!string.IsNullOrEmpty(brokerAddr))
+ {
+ command += " -b " + brokerAddr;
+ }
+ if (!string.IsNullOrEmpty(clusterName))
+ {
+ command += " -c " + clusterName;
+ }
+ command += " -a " + "+message.type=FIFO";
+ return ExecuteShellCommand(command);
+ }
+
+ public static string CreateTransactionTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+ {
+ // use absolute path
+ string path = GetRootPath();
+ string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+
+ if (!string.IsNullOrEmpty(nameserver))
+ {
+ command += " -n " + nameserver;
+ }
+ if (!string.IsNullOrEmpty(brokerAddr))
+ {
+ command += " -b " + brokerAddr;
+ }
+ if (!string.IsNullOrEmpty(clusterName))
+ {
+ command += " -c " + clusterName;
+ }
+ command += " -a " + "+message.type=TRANSACTION";
+ return ExecuteShellCommand(command);
+ }
+
+ public static string CreateOrderlyConsumerGroup(string consumerGroup, string brokerAddr, string clusterName, string nameserver)
+ {
+ // use absolute path
+ string path = GetRootPath();
+ string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateSubGroup -g " + consumerGroup;
+ if (!string.IsNullOrEmpty(nameserver))
+ {
+ command += " -n " + nameserver;
+ }
+ if (!string.IsNullOrEmpty(brokerAddr))
+ {
+ command += " -b " + brokerAddr;
+ }
+ if (!string.IsNullOrEmpty(clusterName))
+ {
+ command += " -c " + clusterName;
+ }
+ command += " -s true -o true -m false -d false ";
+ return ExecuteShellCommand(command);
+ }
+
+ public static string ClusterList(string nameserver)
+ {
+ string path = GetRootPath();
+ string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin clusterlist";
+ if (!string.IsNullOrEmpty(nameserver))
+ {
+ command += " -n " + nameserver;
+ }
+ return ExecuteShellCommand(command);
+ }
+
+ }
+
+}
diff --git a/csharp/rocketmq-client-csharp-tests/utils/NameUtils.cs b/csharp/rocketmq-client-csharp-tests/utils/NameUtils.cs
new file mode 100644
index 0000000..54139bb
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/utils/NameUtils.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+
+namespace Utils
+{
+ public static class NameUtils
+ {
+ private static readonly Random random = new Random();
+ private static readonly string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+ public static string RandomString(int length)
+ {
+ var sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++)
+ {
+ sb.Append(chars[random.Next(chars.Length)]);
+ }
+ return sb.ToString();
+ }
+
+ public static string GetTopicName()
+ {
+ return "topic-" + RandomString(8);
+ }
+
+ public static string GetGroupName()
+ {
+ return "group-" + RandomString(8);
+ }
+ }
+}
\ No newline at end of file