You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/23 08:15:11 UTC

[rocketmq-e2e] branch master updated: add csharp e2e test case (#12)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-e2e.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e08da1  add csharp e2e test case (#12)
4e08da1 is described below

commit 4e08da19a4d68683194bcadfe44d35cddeab68f7
Author: rook1ewang <16...@users.noreply.github.com>
AuthorDate: Thu Mar 23 16:15:04 2023 +0800

    add csharp e2e test case (#12)
    
    Co-authored-by: ww269266 <ww...@alibaba-inc.com>
---
 csharp/.gitignore                                  |   3 +
 csharp/README.md                                   |  50 ++++
 csharp/bin/run.sh                                  |  24 ++
 csharp/rocketmq-client-csharp-tests.sln            |  34 +++
 .../rocketmq-client-csharp-tests.csproj            |  25 ++
 .../rocketmq-client-csharp-tests/test/BaseTest.cs  |  32 +++
 .../test/DelayMsgTest.cs                           | 308 +++++++++++++++++++++
 .../test/FifoMsgTest.cs                            | 191 +++++++++++++
 .../test/NormalMsgTest.cs                          | 177 ++++++++++++
 .../test/TransMsgTest.cs                           | 305 ++++++++++++++++++++
 .../utils/ClientUtils.cs                           |  46 +++
 .../utils/MQAdminUtils.cs                          | 180 ++++++++++++
 .../utils/NameUtils.cs                             |  48 ++++
 13 files changed, 1423 insertions(+)

diff --git a/csharp/.gitignore b/csharp/.gitignore
new file mode 100644
index 0000000..e9dd4ab
--- /dev/null
+++ b/csharp/.gitignore
@@ -0,0 +1,3 @@
+rocketmq-client-csharp-tests/bin
+rocketmq-client-csharp-tests/obj
+.idea
\ No newline at end of file
diff --git a/csharp/README.md b/csharp/README.md
new file mode 100644
index 0000000..bd7145f
--- /dev/null
+++ b/csharp/README.md
@@ -0,0 +1,50 @@
+
+## Apache RocketMQ E2E
+[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
+
+RocketMQ Csharp E2E Test
+
+### Test Case Coverage
+* Message Type
+    * Normal message
+    * Transaction message
+    * Order message
+    * Delay message
+* Producer
+    * Sync Send
+    * Async Send
+* **PushConsumer (sdk not accomplished)**
+* SimpleConsumer
+    * Order/Delay/Transaction/Normal
+    * Sync receive/**Async receive  (sdk not accomplished)**
+    * Sync ack/Async ack
+* Client init(Producer/SimpleConsumer/**PushConsumer (sdk not accomplished)**)
+    * Parameter settings
+* Model
+    * broadcast
+    * cluster
+* Message
+    * Tag
+    * Body
+    * Key
+    * User property
+* Filter
+    * Tag
+    * Sql
+* Retry
+    * Normal message
+    * Order message
+
+#### How to start
+```angular2html
+# nameserver、endpoint and broker 、clustername was from ENV ALL_IP,You can view the details in common/bin/env.sh
+# cd project and run csharp e2e test case
+cd csharp && sh bin/run.sh  
+```
+##### Options
+* `ALL_IP` : required, set by GitHub actions
+* `cluster`: not required, default `DefaultCluster`
+
+####  how to replace csharp client sdk version
+```angular2html 
+# cd rocketmq-client-csharp-tests && dotnet add package RocketMQ.Client --version 5.0.1
\ No newline at end of file
diff --git a/csharp/bin/run.sh b/csharp/bin/run.sh
new file mode 100644
index 0000000..96d6eed
--- /dev/null
+++ b/csharp/bin/run.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# cd project base dir to compile mqadmin utils for other language e2e test using
+cd ../common &&  mvn -Prelease -DskipTests clean package -U
+# set env for mqadmin (use source to set linux env variables in current shell)
+cd ../rocketmq-admintools && source bin/env.sh
+# run csharp e2e test case
+cd ../csharp/rocketmq-client-csharp-tests/ && dotnet test -l "console;verbosity=detailed"
diff --git a/csharp/rocketmq-client-csharp-tests.sln b/csharp/rocketmq-client-csharp-tests.sln
new file mode 100644
index 0000000..fb34cfa
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests.sln
@@ -0,0 +1,34 @@
+
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.30114.105
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "rocketmq-client-csharp-tests", "rocketmq-client-csharp-tests\rocketmq-client-csharp-tests.csproj", "{9F749350-A3D0-423E-AFB6-79E521C777D0}"
+EndProject
+Global
+	GlobalSection(SolutionConfigurationPlatforms) = preSolution
+		Debug|Any CPU = Debug|Any CPU
+		Debug|x64 = Debug|x64
+		Debug|x86 = Debug|x86
+		Release|Any CPU = Release|Any CPU
+		Release|x64 = Release|x64
+		Release|x86 = Release|x86
+	EndGlobalSection
+	GlobalSection(SolutionProperties) = preSolution
+		HideSolutionNode = FALSE
+	EndGlobalSection
+	GlobalSection(ProjectConfigurationPlatforms) = postSolution
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x64.ActiveCfg = Debug|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x64.Build.0 = Debug|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x86.ActiveCfg = Debug|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|x86.Build.0 = Debug|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|Any CPU.Build.0 = Release|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x64.ActiveCfg = Release|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x64.Build.0 = Release|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x86.ActiveCfg = Release|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Release|x86.Build.0 = Release|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+		{9F749350-A3D0-423E-AFB6-79E521C777D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
+	EndGlobalSection
+EndGlobal
diff --git a/csharp/rocketmq-client-csharp-tests/rocketmq-client-csharp-tests.csproj b/csharp/rocketmq-client-csharp-tests/rocketmq-client-csharp-tests.csproj
new file mode 100644
index 0000000..a75e880
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/rocketmq-client-csharp-tests.csproj
@@ -0,0 +1,25 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>net7.0</TargetFramework>
+    <ImplicitUsings>enable</ImplicitUsings>
+    <Nullable>enable</Nullable>
+
+    <IsPackable>false</IsPackable>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
+    <PackageReference Include="RocketMQ.Client" Version="5.1.0" />
+    <PackageReference Include="xunit" Version="2.4.1" />
+    <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
+      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+      <PrivateAssets>all</PrivateAssets>
+    </PackageReference>
+    <PackageReference Include="coverlet.collector" Version="3.1.2">
+      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+      <PrivateAssets>all</PrivateAssets>
+    </PackageReference>
+  </ItemGroup>
+
+</Project>
diff --git a/csharp/rocketmq-client-csharp-tests/test/BaseTest.cs b/csharp/rocketmq-client-csharp-tests/test/BaseTest.cs
new file mode 100644
index 0000000..04eac78
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/BaseTest.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Utils;
+
+namespace Rocketmq.Tests
+{
+    public class BaseTest
+    {
+        protected const int sendNum = 10;
+        protected string accessKey = string.IsNullOrEmpty(ClientUtils.ACCESS_KEY)? "":ClientUtils.ACCESS_KEY;
+        protected string secretKey = string.IsNullOrEmpty(ClientUtils.SECRET_KEY)? "":ClientUtils.SECRET_KEY;
+        protected string nameserver = ClientUtils.NAMESERVER;
+        protected string endpoints = ClientUtils.GRPC_ENDPOINT;
+        protected string clusterName = ClientUtils.CLUSTER_NAME;
+        protected string brokerAddr = ClientUtils.BROKER_ADDR;
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/DelayMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/DelayMsgTest.cs
new file mode 100644
index 0000000..15fabd8
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/DelayMsgTest.cs
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System;
+
+namespace Rocketmq.Tests
+
+{
+    public class DealyMsgTest : BaseTest
+    {
+        private readonly ITestOutputHelper _output;
+
+      
+        public DealyMsgTest(ITestOutputHelper output)
+        {
+            _output = output;
+        }
+
+        [Fact]
+        public async Task TestSendDelayMsgSyncSimpleConsumerRecv()
+        {
+            List<string> sendMsgIdList = new List<string>();
+            List<string> recvMsgIdList = new List<string>();
+            
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            MQAdminUtils.CreateDelayTopic(topic,null,clusterName,nameserver);
+            
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+
+            await simpleConsumer.Receive(32,TimeSpan.FromSeconds(15));
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var bytes = Encoding.UTF8.GetBytes(NameUtils.RandomString(8));
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys(NameUtils.RandomString(8))
+                    .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(5))
+                    .Build();
+
+                var sendReceipt = await producer.Send(message);
+                sendMsgIdList.Add(sendReceipt.MessageId);
+                _output.WriteLine(
+                    $"[{DateTime.UtcNow:HH:mm:ss.fff}] Send delay message: {message}, message id: {sendReceipt.MessageId}");
+            }
+
+            DateTime endTime = DateTime.Now.AddSeconds(30);
+            while (true)
+            {
+                if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                try
+                {
+                    var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                    foreach (var message in messageViews)
+                    {
+                        await simpleConsumer.Ack(message);
+                        recvMsgIdList.Add(message.MessageId);
+                        _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack delay message: {message}");
+                    }
+
+                    await Task.Delay(2000);
+                }
+                catch (Exception ex)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+                }
+            }
+
+            sendMsgIdList.Sort();
+            recvMsgIdList.Sort();
+            Assert.Equal(sendMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIdList);
+        }
+
+        [Fact]
+        public async Task TestSendDelayMsgSyncSimpleConsumerRecvTwice()
+        {
+            List<string> sendMsgIdList = new List<string>();
+            List<string> recvMsgIdList = new List<string>();
+        
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);       
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+
+            MQAdminUtils.CreateDelayTopic(topic,null,clusterName,nameserver);
+
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await simpleConsumer.Receive(32,TimeSpan.FromSeconds(15));
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .Build();
+           
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("keyA", "keyB")
+                    .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(5))
+                    .Build();
+
+                var sendReceipt = await producer.Send(message);
+                sendMsgIdList.Add(sendReceipt.MessageId);
+                _output.WriteLine(
+                    $"[{DateTime.UtcNow:HH:mm:ss.fff}] Send delay message: {message}, message id: {sendReceipt.MessageId}");
+            }
+
+            DateTime endTime = DateTime.Now.AddSeconds(60);
+            while (true)
+            {
+                if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                try
+                {
+                    var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                    foreach (var message in messageViews)
+                    {
+                        if (message.DeliveryAttempt == 2)
+                        {
+                            await simpleConsumer.Ack(message);
+                            recvMsgIdList.Add(message.MessageId);
+                            _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack delay message: {message}");
+                        }
+                        else
+                        {
+                            _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack delay message: {message}");
+                        }
+                    }
+
+                    await Task.Delay(2000);
+                }
+                catch (Exception ex)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+                }
+            }
+
+            sendMsgIdList.Sort();
+            recvMsgIdList.Sort();
+            Assert.Equal(sendMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIdList);
+        }
+
+        [Fact]
+        public async Task TestSendDelayMsgSyncSimpleConsumerRecvMore()
+        {
+            List<string> sendMsgIdList = new List<string>();
+            List<string> recvMsgIdList = new List<string>();
+            
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+            MQAdminUtils.CreateDelayTopic(topic,null,clusterName,nameserver);
+        
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                    .SetClientConfig(clientConfig)
+                    .SetConsumerGroup(consumerGroup)
+                    .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                    .SetSubscriptionExpression(new Dictionary<string, FilterExpression>
+                        { { topic, new FilterExpression(tag) } })
+                    .Build();
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("keyA", "keyB")
+                    .SetDeliveryTimestamp(DateTime.UtcNow + TimeSpan.FromSeconds(5))
+                    .Build();
+
+                var sendReceipt = await producer.Send(message);
+                sendMsgIdList.Add(sendReceipt.MessageId);
+                _output.WriteLine(
+                    $"[{DateTime.UtcNow:HH:mm:ss.fff}] Send message: {message}, message id: {sendReceipt.MessageId}");
+            }
+
+            DateTime endTime = DateTime.UtcNow.AddSeconds(90);
+
+            while (true)
+            {
+                if (DateTime.UtcNow > endTime)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                try
+                {
+                    var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                    foreach (var message in messageViews)
+                    {
+                        if (message.DeliveryAttempt > 2)
+                        {
+                            await simpleConsumer.Ack(message);
+                            _output.WriteLine(
+                                $"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack message: {message}, message id: {message.MessageId}");
+                            recvMsgIdList.Add(message.MessageId);
+                        }
+                        else
+                        {
+                            _output.WriteLine(
+                                $"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack message: {message}, message id: {message.MessageId}");
+                        }
+                    }
+                }
+                catch (Exception ex)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+                }
+                await Task.Delay(2000);
+            }
+
+            sendMsgIdList.Sort();
+            recvMsgIdList.Sort();
+            Assert.Equal(sendMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIdList);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/FifoMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/FifoMsgTest.cs
new file mode 100644
index 0000000..f70b1ce
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/FifoMsgTest.cs
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System;
+
+namespace Rocketmq.Tests
+{
+    public class FifoMsgTest : BaseTest
+    {
+        private readonly ITestOutputHelper _output;
+
+        public FifoMsgTest(ITestOutputHelper output)
+        {
+            _output = output;
+        }
+
+        [Fact]
+        public async Task TestSendFifoMsgSyncSimpleConsumerRecv()
+        {
+            List<string> sendMsgIdList = new List<string>();
+            List<string> recvMsgIdList = new List<string>();
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            MQAdminUtils.CreateFIFOTopic(topic, null, clusterName, nameserver);
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                const string messageGroup = "messageGroup1";
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("yourMessageKey-7044358f98fc")
+                    .SetMessageGroup(messageGroup)
+                    .Build();
+
+                var sendReceipt = await producer.Send(message);
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send fifo message: {message}, message id: {sendReceipt.MessageId}");
+                sendMsgIdList.Add(sendReceipt.MessageId);
+            }
+
+            DateTime endTime = DateTime.Now.AddSeconds(60);
+            while (true)
+            {
+                if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                foreach (var message in messageViews)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack fifo message: {message}");
+                    await simpleConsumer.Ack(message);
+                    recvMsgIdList.Add(message.MessageId);
+                }
+            }
+
+            Assert.Equal(sendMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIdList);
+        }
+
+
+        [Fact]
+        public async Task TestSendFifoMsgSyncSimpleConsumerRecvTwice()
+        {
+
+            List<string> sendMsgIdList = new List<string>();
+            List<string> recvMsgIdList = new List<string>();
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+            MQAdminUtils.CreateFIFOTopic(topic, null, clusterName, nameserver);
+
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                const string messageGroup = "messageGroup2";
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("yourMessageKey-7044358f98fc")
+                    .SetMessageGroup(messageGroup)
+                    .Build();
+
+                var sendReceipt = await producer.Send(message);
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send fifo message: {message}, message id: {sendReceipt.MessageId}");
+                sendMsgIdList.Add(sendReceipt.MessageId);
+            }
+
+            DateTime endTime = DateTime.Now.AddSeconds(30);
+            while (true)
+            {
+                if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                foreach (var message in messageViews)
+                {
+                    if (message.DeliveryAttempt == 2)
+                    {
+                        await simpleConsumer.Ack(message);
+                        _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack fifo message: {message}");
+                        recvMsgIdList.Add(message.MessageId);
+                    }
+                    else
+                    {
+                        _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack fifo message: {message}");
+                    }
+                }
+                await Task.Delay(2000);
+            }
+
+            Assert.Equal(sendMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIdList);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/NormalMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/NormalMsgTest.cs
new file mode 100644
index 0000000..2e41d04
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/NormalMsgTest.cs
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System.Collections.Generic;
+using System;
+using System.Threading.Tasks;
+
+namespace Rocketmq.Tests
+{
+    public class NormalMsgTest : BaseTest
+    {
+        private readonly ITestOutputHelper _output;
+
+        public NormalMsgTest(ITestOutputHelper output)
+        {
+            _output = output;
+        }
+
+        [Fact]
+        public async Task TestSendNormalMsgSyncSimpleConsumerRecv()
+        {
+            List<string> sendMsgIdList = new List<string>();
+            List<string> recvMsgIdList = new List<string>();
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+            MQAdminUtils.CreateTopic(topic, null, clusterName, nameserver);
+
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("yourMessageKey-7044358f98fc")
+                    .Build();
+
+                var sendReceipt = await producer.Send(message);
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send normal message: {message}, message id: {sendReceipt.MessageId}");
+                sendMsgIdList.Add(sendReceipt.MessageId);
+            }
+
+            DateTime endTime = DateTime.Now.AddSeconds(60);
+            while (true)
+            {
+                if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                foreach (var message in messageViews)
+                {
+                    await simpleConsumer.Ack(message);
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack normal message: {message}");
+                    recvMsgIdList.Add(message.MessageId);
+                }
+                await Task.Delay(2000);
+            }
+            sendMsgIdList.Sort();
+            recvMsgIdList.Sort();
+            Assert.Equal(sendMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIdList);
+        }
+
+        [Fact]
+        public async Task TestSendNormalMsgSyncSimpleConsumerRecvRetry()
+        {
+            List<string> sendMsgIdList = new List<string>();
+            List<string> recvMsgIdList = new List<string>();
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+            MQAdminUtils.CreateTopic(topic, null, clusterName, nameserver);
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("yourMessageKey-7044358f98fc")
+                    .Build();
+
+                var sendReceipt = await producer.Send(message);
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send normal message: {message}, message id: {sendReceipt.MessageId}");
+                sendMsgIdList.Add(sendReceipt.MessageId);
+            }
+
+            DateTime endTime = DateTime.UtcNow.AddSeconds(90);
+            while (true)
+            {
+                if (DateTime.UtcNow > endTime)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                foreach (var message in messageViews)
+                {
+                    _output.WriteLine(
+                        $"[{DateTime.UtcNow:HH:mm:ss.fff}] recv normal msg: {message}, message id: {message.MessageId}");
+                }
+                await Task.Delay(2000);
+            }
+        }
+
+        // test send with not match message type
+
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/test/TransMsgTest.cs b/csharp/rocketmq-client-csharp-tests/test/TransMsgTest.cs
new file mode 100644
index 0000000..455380e
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/test/TransMsgTest.cs
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Concurrent;
+using System.Text;
+using Org.Apache.Rocketmq;
+using Xunit;
+using Xunit.Abstractions;
+
+using Utils;
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace Rocketmq.Tests
+{
+    public class TransMsgTest : BaseTest
+    {
+
+        private readonly ITestOutputHelper _output;
+
+        public TransMsgTest(ITestOutputHelper output)
+        {
+            _output = output;
+        }
+
+        private class TransactionChecker : ITransactionChecker
+        {
+            private readonly ConcurrentBag<string> _sendMsgIds;
+            private readonly bool _isCommit;
+            private readonly ITestOutputHelper _output;
+
+            public TransactionChecker(ConcurrentBag<string> sendMsgIds, bool isCommit, ITestOutputHelper output)
+            {
+                _sendMsgIds = sendMsgIds;
+                _isCommit = isCommit;
+                _output = output;
+            }
+
+            public TransactionResolution Check(MessageView messageView)
+            {
+                _sendMsgIds.Add(messageView.MessageId);
+                var action = _isCommit ? "commit" : "rollback";
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Checker {action} trans msg, messageview={messageView}");
+                return _isCommit ? TransactionResolution.Commit : TransactionResolution.Rollback;
+            }
+        }
+
+        [Fact]
+        public async Task TestSendTransHalfCommitMsgSyncSimpleConsumerRecv()
+        {
+            ConcurrentBag<string> sendMsgIdList = new ConcurrentBag<string>();
+            List<string> recvMsgIdList = new List<string>();
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+
+            MQAdminUtils.CreateTransactionTopic(topic, null, clusterName, nameserver);
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .SetTransactionChecker(new TransactionChecker(sendMsgIdList, true, _output))
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var transaction = producer.BeginTransaction();
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("keyA", "keyB")
+                    .Build();
+
+                var sendReceipt = await producer.Send(message, transaction);
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send trans message: {message}, message id: {sendReceipt.MessageId}");
+                if (i % 2 == 0)
+                {
+                    transaction.Commit();
+                    sendMsgIdList.Add(sendReceipt.MessageId);
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Commit trans msg, sendReceipt={sendReceipt}");
+                }
+            }
+
+            DateTime endTime = DateTime.Now.AddSeconds(120);
+            while (true)
+            {
+                if (DateTime.Now > endTime || recvMsgIdList.Count >= sendNum)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+                var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                foreach (var message in messageViews)
+                {
+                    await simpleConsumer.Ack(message);
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack trans message: {message}");
+                    recvMsgIdList.Add(message.MessageId);
+                }
+                await Task.Delay(2000);
+            }
+
+            var sendMsgIds = sendMsgIdList.ToList();
+            sendMsgIds.Sort();
+            recvMsgIdList.Sort();
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIds);
+        }
+
+        [Fact]
+        public async Task TestSendTransCheckRollbackMsgSyncSimpleConsumerRecv()
+        {
+            ConcurrentBag<string> sendMsgIdList = new ConcurrentBag<string>();
+            List<string> recvMsgIdList = new List<string>();
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+
+            MQAdminUtils.CreateTransactionTopic(topic, null, clusterName, nameserver);
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .SetTransactionChecker(new TransactionChecker(sendMsgIdList, false, _output))
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var transaction = producer.BeginTransaction();
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("keyA", "keyB")
+                    .Build();
+
+                var sendReceipt = await producer.Send(message, transaction);
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Send trans msg, sendReceipt={sendReceipt}");
+                // Commit the transaction.
+                if (i % 2 == 0)
+                {
+                    transaction.Rollback();
+                    sendMsgIdList.Add(sendReceipt.MessageId);
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Rollback trans msg, sendReceipt={sendReceipt}");
+                }
+            }
+
+            DateTime endTime = DateTime.Now.AddSeconds(60);
+            while (true)
+            {
+                if (DateTime.Now > endTime)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+                var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                foreach (var message in messageViews)
+                {
+                    await simpleConsumer.Ack(message);
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack trans message: {message}");
+                    recvMsgIdList.Add(message.MessageId);
+                }
+                await Task.Delay(2000);
+            }
+            Assert.Equal(recvMsgIdList.Count, 0);
+        }
+
+
+        [Fact]
+        public async Task TestSendTransCheckRollbackMsgSyncSimpleConsumerRecvRetrys()
+        {
+            ConcurrentBag<string> sendMsgIdList = new ConcurrentBag<string>();
+            List<string> recvMsgIdList = new List<string>();
+            string topic = NameUtils.GetTopicName();
+            string consumerGroup = NameUtils.GetGroupName();
+            string tag = NameUtils.RandomString(8);
+
+            MQAdminUtils.CreateTransactionTopic(topic, null, clusterName, nameserver);
+            var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
+            var clientConfig = new ClientConfig.Builder()
+                .SetEndpoints(endpoints)
+                .SetCredentialsProvider(credentialsProvider)
+                .Build();
+
+            var subscription = new Dictionary<string, FilterExpression>
+                { { topic, new FilterExpression(tag) } };
+            await using var simpleConsumer = await new SimpleConsumer.Builder()
+                .SetClientConfig(clientConfig)
+                .SetConsumerGroup(consumerGroup)
+                .SetAwaitDuration(TimeSpan.FromSeconds(15))
+                .SetSubscriptionExpression(subscription)
+                .Build();
+            await simpleConsumer.Receive(32, TimeSpan.FromSeconds(15));
+
+            await using var producer = await new Producer.Builder()
+                .SetTopics(topic)
+                .SetClientConfig(clientConfig)
+                .SetTransactionChecker(new TransactionChecker(sendMsgIdList, false, _output))
+                .Build();
+
+            for (int i = 0; i < sendNum; i++)
+            {
+                var transaction = producer.BeginTransaction();
+                var bytes = Encoding.UTF8.GetBytes("foobar");
+                var message = new Message.Builder()
+                    .SetTopic(topic)
+                    .SetBody(bytes)
+                    .SetTag(tag)
+                    .SetKeys("keyA", "keyB")
+                    .Build();
+
+                var sendReceipt = await producer.Send(message, transaction);
+                // Commit the transaction.
+                transaction.Commit();
+                sendMsgIdList.Add(sendReceipt.MessageId);
+                _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Commit trans msg, sendReceipt={sendReceipt}");
+            }
+
+            DateTime endTime = DateTime.UtcNow.AddSeconds(120);
+            while (true)
+            {
+                if (DateTime.UtcNow > endTime || recvMsgIdList.Count == sendNum)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Break while loop");
+                    break;
+                }
+
+                try
+                {
+                    var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
+                    foreach (var message in messageViews)
+                    {
+                        if (message.DeliveryAttempt > 2)
+                        {
+                            _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Ack trans message: {message}");
+                            await simpleConsumer.Ack(message);
+                            recvMsgIdList.Add(message.MessageId);
+                        }
+                        else
+                        {
+                            _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Recv but not ack trans msg trans message: {message}");
+                        }
+                    }
+                    await Task.Delay(2000);
+                }
+                catch (Exception ex)
+                {
+                    _output.WriteLine($"[{DateTime.UtcNow:HH:mm:ss.fff}] Exception: {ex}");
+                }
+            }
+            var sendMsgIds = sendMsgIdList.ToList();
+            sendMsgIds.Sort();
+            recvMsgIdList.Sort();
+            Assert.Equal(sendMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList.Count, sendNum);
+            Assert.Equal(recvMsgIdList, sendMsgIds);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/utils/ClientUtils.cs b/csharp/rocketmq-client-csharp-tests/utils/ClientUtils.cs
new file mode 100644
index 0000000..81a9d85
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/utils/ClientUtils.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Utils
+{
+    public class ClientUtils
+    {
+
+        public static readonly string NAMESERVER;
+        public static readonly string ACCESS_KEY;
+        public static readonly string SECRET_KEY;
+
+        public static readonly string GRPC_ENDPOINT;
+
+        public static readonly string BROKER_ADDR;
+
+        public static readonly string CLUSTER_NAME;
+
+        static ClientUtils()
+        {
+            NAMESERVER = Environment.GetEnvironmentVariable("NAMESERVER");
+            GRPC_ENDPOINT = Environment.GetEnvironmentVariable("GRPC_ENDPOINT");
+            CLUSTER_NAME = Environment.GetEnvironmentVariable("CLUSTER_NAME");
+            BROKER_ADDR = Environment.GetEnvironmentVariable("BROKER_ADDR");
+            ACCESS_KEY = Environment.GetEnvironmentVariable("ACCESS_KEY");
+            SECRET_KEY = Environment.GetEnvironmentVariable("SECRET_KEY");
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp-tests/utils/MQAdminUtils.cs b/csharp/rocketmq-client-csharp-tests/utils/MQAdminUtils.cs
new file mode 100644
index 0000000..523889a
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/utils/MQAdminUtils.cs
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.IO;
+
+namespace Utils
+{
+    public static class MQAdminUtils
+    {
+      
+        private static string GetRootPath()
+        {
+            string projectBasePath = Environment.CurrentDirectory;
+            string path = Path.GetDirectoryName(projectBasePath);
+            path=Path.GetDirectoryName(path);
+            path=Path.GetDirectoryName(path);
+            path=Path.GetDirectoryName(path);
+            path=Path.GetDirectoryName(path);
+            return path;
+        }
+        
+
+        // Execute a shell command and return its output as a string
+        public static string ExecuteShellCommand(string command)
+        {
+            var process = new Process()
+            {
+                StartInfo = new ProcessStartInfo
+                {
+                    FileName =  "/bin/bash", // Use cmd on Windows and bash on other platforms
+                    Arguments = $"-c \"{command}\"", // Wrap the command with /c or -c flags
+                    RedirectStandardOutput = true,
+                    UseShellExecute = false,
+                    CreateNoWindow = true,
+                }
+            };
+            process.Start();
+            string output = process.StandardOutput.ReadToEnd();
+            process.WaitForExit();
+            Console.WriteLine(output);
+            return output;
+        }
+        
+    
+        public static string CreateTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+        {
+            // use absolute path
+            string path = GetRootPath();
+            string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+            if (!string.IsNullOrEmpty(nameserver))
+            {
+                command += " -n " + nameserver;
+            }
+            if (!string.IsNullOrEmpty(brokerAddr))
+            {
+                command += " -b " + brokerAddr;
+            }
+            if (!string.IsNullOrEmpty(clusterName))
+            {
+                command += " -c " + clusterName;
+            }
+            Console.WriteLine(command);
+            return ExecuteShellCommand(command);
+        }
+
+        public static string CreateDelayTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+        {
+            // use absolute path
+            string path = GetRootPath();
+            string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+            if (!string.IsNullOrEmpty(nameserver))
+            {
+                command += " -n " + nameserver;
+            }
+            if (!string.IsNullOrEmpty(brokerAddr))
+            {
+                command += " -b " + brokerAddr;
+            }
+            if (!string.IsNullOrEmpty(clusterName))
+            {
+                command += " -c " + clusterName;
+            }
+            command += " -a " + "+message.type=DELAY";
+            return ExecuteShellCommand(command);
+        }
+
+        public static string CreateFIFOTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+        {
+            // use absolute path
+            string path = GetRootPath();
+            string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+            if (!string.IsNullOrEmpty(nameserver))
+            {
+                command += " -n " + nameserver;
+            }
+            if (!string.IsNullOrEmpty(brokerAddr))
+            {
+                command += " -b " + brokerAddr;
+            }
+            if (!string.IsNullOrEmpty(clusterName))
+            {
+                command += " -c " + clusterName;
+            }
+            command += " -a " + "+message.type=FIFO";
+            return ExecuteShellCommand(command);
+        }
+
+        public static string CreateTransactionTopic(string topicName, string brokerAddr, string clusterName, string nameserver)
+        {
+            // use absolute path
+            string path = GetRootPath();
+            string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateTopic -t " + topicName;
+
+            if (!string.IsNullOrEmpty(nameserver))
+            {
+                command += " -n " + nameserver;
+            }
+            if (!string.IsNullOrEmpty(brokerAddr))
+            {
+                command += " -b " + brokerAddr;
+            }
+            if (!string.IsNullOrEmpty(clusterName))
+            {
+                command += " -c " + clusterName;
+            }
+            command += " -a " + "+message.type=TRANSACTION";
+            return ExecuteShellCommand(command);
+        }
+
+        public static string CreateOrderlyConsumerGroup(string consumerGroup, string brokerAddr, string clusterName, string nameserver)
+        {
+            // use absolute path
+            string path = GetRootPath();
+            string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin updateSubGroup -g " + consumerGroup;
+            if (!string.IsNullOrEmpty(nameserver))
+            {
+                command += " -n " + nameserver;
+            }
+            if (!string.IsNullOrEmpty(brokerAddr))
+            {
+                command += " -b " + brokerAddr;
+            }
+            if (!string.IsNullOrEmpty(clusterName))
+            {
+                command += " -c " + clusterName;
+            }
+            command += " -s true -o true -m false -d false ";
+            return ExecuteShellCommand(command);
+        }
+
+        public static string ClusterList(string nameserver)
+        {
+            string path = GetRootPath();
+            string command = "sh " + path + "/rocketmq-admintools/bin/mqadmin clusterlist";
+            if (!string.IsNullOrEmpty(nameserver))
+            {
+                command += " -n " + nameserver;
+            }
+            return ExecuteShellCommand(command);
+        }
+       
+    }
+    
+}
diff --git a/csharp/rocketmq-client-csharp-tests/utils/NameUtils.cs b/csharp/rocketmq-client-csharp-tests/utils/NameUtils.cs
new file mode 100644
index 0000000..54139bb
--- /dev/null
+++ b/csharp/rocketmq-client-csharp-tests/utils/NameUtils.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+
+namespace Utils
+{
+    public static class NameUtils
+    {
+        private static readonly Random random = new Random();
+        private static readonly string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+        public static string RandomString(int length)
+        {
+            var sb = new StringBuilder(length);
+            for (int i = 0; i < length; i++)
+            {
+                sb.Append(chars[random.Next(chars.Length)]);
+            }
+            return sb.ToString();
+        }
+
+        public static string GetTopicName()
+        {
+            return "topic-" + RandomString(8);
+        }
+
+        public static string GetGroupName()
+        {
+            return "group-" + RandomString(8);
+        }
+    }
+}
\ No newline at end of file