You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:55 UTC

[rocketmq-clients] 15/28: Implement transaction message

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 7de5b3957cf7d32dcff0edee61d6ed0710febb56
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Feb 16 13:51:02 2023 +0800

    Implement transaction message
---
 csharp/examples/ProducerBenchmark.cs               |   2 +-
 csharp/examples/ProducerFifoMessageExample.cs      |  81 +++++++-------
 ...ark.cs => ProducerTransactionMessageExample.cs} |  71 +++++-------
 csharp/examples/QuickStart.cs                      |   4 +-
 csharp/rocketmq-client-csharp/Client.cs            |  18 ++-
 csharp/rocketmq-client-csharp/ClientManager.cs     |   6 +
 csharp/rocketmq-client-csharp/IClientManager.cs    |   3 +
 .../ITransaction.cs}                               |  15 +--
 .../ITransactionChecker.cs}                        |  19 ++--
 csharp/rocketmq-client-csharp/MessageView.cs       |   5 +
 csharp/rocketmq-client-csharp/Producer.cs          |  78 +++++++++++--
 csharp/rocketmq-client-csharp/SendReceipt.cs       |  15 ++-
 csharp/rocketmq-client-csharp/Transaction.cs       | 121 +++++++++++++++++++++
 .../TransactionResolution.cs}                      |  15 +--
 csharp/tests/SendResultTest.cs                     |   6 +-
 15 files changed, 322 insertions(+), 137 deletions(-)

diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
index 4e334104..3918666d 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -68,7 +68,7 @@ namespace examples
                 Keys = keys
             };
 
-            const int tpsLimit = 800;
+            const int tpsLimit = 500;
 
             Task.Run(async () =>
             {
diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs
index d9a72a59..87f953c3 100644
--- a/csharp/examples/ProducerFifoMessageExample.cs
+++ b/csharp/examples/ProducerFifoMessageExample.cs
@@ -17,55 +17,60 @@
 
 using System.Collections.Generic;
 using System.Text;
+using System.Threading;
 using System.Threading.Tasks;
 using NLog;
 using Org.Apache.Rocketmq;
 
 namespace examples
 {
-    static class ProducerFifoMessageExample
+    internal static class ProducerFifoMessageExample
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
         internal static async Task QuickStart()
         {
-            // string accessKey = "yourAccessKey";
-            // string secretKey = "yourSecretKey";
-            // // Credential provider is optional for client configuration.
-            // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
-            // string endpoints = "foobar.com:8080";
-            // // In most case, you don't need to create too many producers, single pattern is recommended.
-            // var producer = new Producer(endpoints)
-            // {
-            //     CredentialsProvider = credentialsProvider
-            // };
-            // string topic = "yourFifoTopic";
-            // // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
-            // // the topic route before message publishing.
-            // producer.AddTopicOfInterest(topic);
-            //
-            // await producer.Start();
-            // // Define your message body.
-            // byte[] bytes = Encoding.UTF8.GetBytes("foobar");
-            // string tag = "yourMessageTagA";
-            // // You could set multiple keys for the single message.
-            // var keys = new List<string>
-            // {
-            //     "yourMessageKey-6cc8b65ed1c8",
-            //     "yourMessageKey-43783375d9a5"
-            // };
-            // // Set topic for current message.
-            // var message = new Message(topic, bytes)
-            // {
-            //     Tag = tag,
-            //     Keys = keys,
-            //     // Essential for FIFO message, messages that belongs to the same message group follow the FIFO semantics.
-            //     MessageGroup = "yourMessageGroup0"
-            // };
-            // var sendReceipt = await producer.Send(message);
-            // Logger.Info($"Send FIFO message successfully, sendReceipt={sendReceipt}.");
-            // // Close the producer if you don't need it anymore.
-            // await producer.Shutdown();
+            const string accessKey = "5jFk0wK7OU6Uq395";
+            const string secretKey = "V1u8z19URHs4o6RQ";
+
+            // Credential provider is optional for client configuration.
+            var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
+            const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+            var clientConfig = new ClientConfig(endpoints)
+            {
+                CredentialsProvider = credentialsProvider
+            };
+            // In most case, you don't need to create too many producers, single pattern is recommended.
+            var producer = new Producer(clientConfig);
+
+            const string topic = "lingchu_fifo_topic";
+            producer.SetTopics(topic);
+            // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
+            // the topic route before message publishing.
+            await producer.Start();
+            // Define your message body.
+            var bytes = Encoding.UTF8.GetBytes("foobar");
+            const string tag = "yourMessageTagA";
+            // You could set multiple keys for the single message.
+            var keys = new List<string>
+            {
+                "yourMessageKey-7044358f98fc",
+                "yourMessageKey-f72539fbc246"
+            };
+            const string messageGroup = "yourMessageGroup";
+            // Set topic for current message.
+            var message = new Message(topic, bytes)
+            {
+                Tag = tag,
+                Keys = keys,
+                // Set message group for FIFO message.
+                MessageGroup = messageGroup
+            };
+            var sendReceipt = await producer.Send(message);
+            Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
+            Thread.Sleep(9999999);
+            // Close the producer if you don't need it anymore.
+            await producer.Shutdown();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerTransactionMessageExample.cs
similarity index 58%
copy from csharp/examples/ProducerBenchmark.cs
copy to csharp/examples/ProducerTransactionMessageExample.cs
index 4e334104..edc4d41f 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -15,31 +15,33 @@
  * limitations under the License.
  */
 
-using System;
 using System.Collections.Generic;
 using System.Text;
-using System.Threading;
 using System.Threading.Tasks;
 using NLog;
 using Org.Apache.Rocketmq;
 
 namespace examples
 {
-    public class ProducerBenchmark
+    internal static class ProducerTransactionMessageExample
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
-        private static long _counter = 0;
-
-        internal static void QuickStart()
+        private class TransactionChecker : ITransactionChecker
         {
-            const string accessKey = "amKhwEM40L61znSz";
-            const string secretKey = "bT6c3gpF3EFB10F3";
+            public TransactionResolution Check(MessageView messageView)
+            {
+                return TransactionResolution.COMMIT;
+            }
+        }
 
+        internal static async Task QuickStart()
+        {
+            const string accessKey = "yourAccessKey";
+            const string secretKey = "yourSecretKey";
             // Credential provider is optional for client configuration.
             var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
-            const string endpoints = "rmq-cn-nwy337bf81g.cn-hangzhou.rmq.aliyuncs.com:8080";
+            const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
             var clientConfig = new ClientConfig(endpoints)
             {
                 CredentialsProvider = credentialsProvider
@@ -47,11 +49,12 @@ namespace examples
             // In most case, you don't need to create too many producers, single pattern is recommended.
             var producer = new Producer(clientConfig);
 
-            const string topic = "lingchu_normal_topic";
+            const string topic = "lingchu_transactional_topic";
             producer.SetTopics(topic);
-            // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
-            // the topic route before message publishing.
-            producer.Start().Wait();
+            producer.SetTransactionChecker(new TransactionChecker());
+
+            await producer.Start();
+            var transaction = producer.BeginTransaction();
             // Define your message body.
             var bytes = Encoding.UTF8.GetBytes("foobar");
             const string tag = "yourMessageTagA";
@@ -67,38 +70,14 @@ namespace examples
                 Tag = tag,
                 Keys = keys
             };
-
-            const int tpsLimit = 800;
-
-            Task.Run(async () =>
-            {
-                while (true)
-                {
-                    _semaphore.Release(tpsLimit);
-                    await Task.Delay(TimeSpan.FromMilliseconds(1000));
-                }
-            });
-
-            Task.Run(async () =>
-            {
-                while (true)
-                {
-                    Logger.Info($"Send {_counter} messages successfully.");
-                    Interlocked.Exchange(ref _counter, 0);
-                    await Task.Delay(TimeSpan.FromSeconds(1));
-                }
-            });
-
-            var tasks = new List<Task>();
-            while (true)
-            {
-                _semaphore.Wait();
-                Interlocked.Increment(ref _counter);
-                var task = producer.Send(message);
-                tasks.Add(task);
-            }
-
-            Task.WhenAll(tasks).Wait();
+            var sendReceipt = await producer.Send(message, transaction);
+            Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
+            // Commit the transaction.
+            transaction.commit();
+            // Or rollback the transaction.
+            // transaction.rollback();
+            // Close the producer if you don't need it anymore.
+            await producer.Shutdown();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs
index 474d6063..8323218f 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/examples/QuickStart.cs
@@ -27,11 +27,11 @@ namespace examples
         {
             // Console.WriteLine(MetadataConstants.Instance.ClientVersion);
 
-            ProducerNormalMessageExample.QuickStart().Wait();
+            // ProducerNormalMessageExample.QuickStart().Wait();
             // await ProducerFifoMessageExample.QuickStart();
             // await ProducerDelayMessageExample.QuickStart();
             // await SimpleConsumerExample.QuickStart();
-            // ProducerBenchmark.QuickStart();
+            ProducerBenchmark.QuickStart();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index fc6871b9..63c24c91 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -389,10 +389,22 @@ namespace Org.Apache.Rocketmq
             return ClientConfig;
         }
 
-        public void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
+        public virtual async void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
             Proto.RecoverOrphanedTransactionCommand command)
         {
-            // TODO
+            Logger.Warn($"Ignore orphaned transaction recovery command from remote, which is not expected, " +
+                        $"clientId={ClientId}, endpoints={endpoints}");
+            var status = new Proto.Status
+            {
+                Code = Proto.Code.InternalError,
+                Message = "Current client don't support transaction message recovery"
+            };
+            var telemetryCommand = new Proto.TelemetryCommand
+            {
+                Status = status
+            };
+            var (_, session) = GetSession(endpoints);
+            await session.write(telemetryCommand);
         }
 
         public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
@@ -405,7 +417,7 @@ namespace Org.Apache.Rocketmq
                 Code = Proto.Code.Unsupported,
                 Message = "Message consumption verification is not supported"
             };
-            var telemetryCommand = new Proto.TelemetryCommand()
+            var telemetryCommand = new Proto.TelemetryCommand
             {
                 Status = status
             };
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index 967a2ac1..f464d461 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -146,5 +146,11 @@ namespace Org.Apache.Rocketmq
         {
             return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout);
         }
+
+        public async Task<Proto.EndTransactionResponse> EndTransaction(Endpoints endpoints,
+            Proto.EndTransactionRequest request, TimeSpan timeout)
+        {
+            return await GetRpcClient(endpoints).EndTransaction(_client.Sign(), request, timeout);
+        }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index df2035ab..2082584c 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -81,6 +81,9 @@ namespace Org.Apache.Rocketmq
         Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
             ChangeInvisibleDurationRequest request, TimeSpan timeout);
 
+        Task<EndTransactionResponse> EndTransaction(Endpoints endpoints, EndTransactionRequest request,
+            TimeSpan timeout);
+
         Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/ITransaction.cs
similarity index 71%
copy from csharp/tests/SendResultTest.cs
copy to csharp/rocketmq-client-csharp/ITransaction.cs
index fae7a7bb..b9898de0 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/rocketmq-client-csharp/ITransaction.cs
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-
 namespace Org.Apache.Rocketmq
 {
-    [TestClass]
-    public class SendResultTest
+    public interface ITransaction
     {
-        [TestMethod]
-        public void testCtor()
-        {
-            string messageId = new string("abc");
-            var sendResult = new SendReceipt(messageId);
-            Assert.AreEqual(messageId, sendResult.MessageId);
-        }
+        void commit();
+
+        void rollback();
     }
 }
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/ITransactionChecker.cs
similarity index 67%
copy from csharp/tests/SendResultTest.cs
copy to csharp/rocketmq-client-csharp/ITransactionChecker.cs
index fae7a7bb..f03350b1 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/rocketmq-client-csharp/ITransactionChecker.cs
@@ -15,19 +15,16 @@
  * limitations under the License.
  */
 
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-
 namespace Org.Apache.Rocketmq
 {
-    [TestClass]
-    public class SendResultTest
+    public interface ITransactionChecker
     {
-        [TestMethod]
-        public void testCtor()
-        {
-            string messageId = new string("abc");
-            var sendResult = new SendReceipt(messageId);
-            Assert.AreEqual(messageId, sendResult.MessageId);
-        }
+        /// <summary>
+        /// Interface that implement this interface will be able to check transactions and
+        /// return a TransactionResolution object representing the result of the check.
+        /// </summary>
+        /// <param name="messageView"></param>
+        /// <returns></returns>
+        TransactionResolution Check(MessageView messageView);
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index dfb45e45..fd095819 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -80,6 +80,11 @@ namespace Org.Apache.Rocketmq
 
         public int DeliveryAttempt { get; }
 
+        public static MessageView FromProtobuf(Proto.Message message)
+        {
+            return FromProtobuf(message, null);
+        }
+
         public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue)
         {
             var topic = message.Topic.Name;
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 4bbc4bfa..23041e4c 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -29,9 +29,9 @@ namespace Org.Apache.Rocketmq
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
         private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
-        private readonly PublishingSettings _publishingSettings;
+        internal readonly PublishingSettings PublishingSettings;
         private readonly ConcurrentDictionary<string, bool> _publishingTopics;
-
+        private ITransactionChecker _checker = null;
 
         public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3)
         {
@@ -47,7 +47,7 @@ namespace Org.Apache.Rocketmq
             base(clientConfig)
         {
             var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
-            _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
+            PublishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
                 clientConfig.RequestTimeout, publishingTopics);
             _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
             _publishingTopics = publishingTopics;
@@ -61,6 +61,11 @@ namespace Org.Apache.Rocketmq
             }
         }
 
+        public void SetTransactionChecker(ITransactionChecker checker)
+        {
+            _checker = checker;
+        }
+
         protected override IEnumerable<string> GetTopics()
         {
             return _publishingTopics.Keys;
@@ -115,13 +120,13 @@ namespace Org.Apache.Rocketmq
 
         private IRetryPolicy GetRetryPolicy()
         {
-            return _publishingSettings.GetRetryPolicy();
+            return PublishingSettings.GetRetryPolicy();
         }
 
         public async Task<SendReceipt> Send(Message message)
         {
             var publishingLoadBalancer = await GetPublishingLoadBalancer(message.Topic);
-            var publishingMessage = new PublishingMessage(message, _publishingSettings, false);
+            var publishingMessage = new PublishingMessage(message, PublishingSettings, false);
             var retryPolicy = GetRetryPolicy();
             var maxAttempts = retryPolicy.GetMaxAttempts();
 
@@ -147,6 +152,12 @@ namespace Org.Apache.Rocketmq
             throw exception!;
         }
 
+        public async Task<SendReceipt> Send(Message message, Transaction transaction)
+        {
+            // TODO
+            return null;
+        }
+
         private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
         {
             return new Proto.SendMessageRequest
@@ -160,7 +171,7 @@ namespace Org.Apache.Rocketmq
         {
             var candidateIndex = (attempt - 1) % candidates.Count;
             var mq = candidates[candidateIndex];
-            if (_publishingSettings.IsValidateMessageType() &&
+            if (PublishingSettings.IsValidateMessageType() &&
                 !mq.AcceptMessageTypes.Contains(message.MessageType))
             {
                 throw new ArgumentException("Current message type does not match with the accept message types," +
@@ -174,7 +185,7 @@ namespace Org.Apache.Rocketmq
                 await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
             try
             {
-                var sendReceipts = SendReceipt.ProcessSendMessageResponse(response);
+                var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, response);
 
                 var sendReceipt = sendReceipts.First();
                 if (attempt > 1)
@@ -206,7 +217,58 @@ namespace Org.Apache.Rocketmq
 
         public override Settings GetSettings()
         {
-            return _publishingSettings;
+            return PublishingSettings;
+        }
+
+        public override async void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
+            Proto.RecoverOrphanedTransactionCommand command)
+        {
+            var messageId = command.Message.SystemProperties.MessageId;
+            if (null == _checker)
+            {
+                Logger.Error($"No transaction checker registered, ignore it, messageId={messageId}, " +
+                             $"transactionId={command.TransactionId}, endpoints={endpoints}, clientId={ClientId}");
+                return;
+            }
+
+            var message = MessageView.FromProtobuf(command.Message);
+            var transactionResolution = _checker.Check(message);
+            switch (transactionResolution)
+            {
+                case TransactionResolution.COMMIT:
+                case TransactionResolution.ROLLBACK:
+                    await EndTransaction(endpoints, message.Topic, message.MessageId, command.TransactionId,
+                        transactionResolution);
+                    break;
+                case TransactionResolution.UNKNOWN:
+                default:
+                    break;
+            }
+        }
+
+        public Transaction BeginTransaction()
+        {
+            return new Transaction(this);
+        }
+
+        internal async Task EndTransaction(Endpoints endpoints, string topic, string messageId, string transactionId,
+            TransactionResolution resolution)
+        {
+            var topicResource = new Proto.Resource
+            {
+                Name = topic
+            };
+            var request = new Proto.EndTransactionRequest
+            {
+                TransactionId = transactionId,
+                MessageId = messageId,
+                Topic = topicResource,
+                Resolution = TransactionResolution.COMMIT == resolution
+                    ? Proto.TransactionResolution.Commit
+                    : Proto.TransactionResolution.Rollback
+            };
+            var response = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout);
+            StatusChecker.Check(response.Status, request);
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs
index fa5c75c7..1e7c61bd 100644
--- a/csharp/rocketmq-client-csharp/SendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -23,19 +23,28 @@ namespace Org.Apache.Rocketmq
 {
     public sealed class SendReceipt
     {
-        public SendReceipt(string messageId)
+        public SendReceipt(string messageId, string transactionId, MessageQueue messageQueue)
         {
             MessageId = messageId;
+            TransactionId = transactionId;
+            MessageQueue = messageQueue;
         }
 
         public string MessageId { get; }
 
+        public string TransactionId { get; }
+
+        public MessageQueue MessageQueue { get; }
+
+        public Endpoints Endpoints => MessageQueue.Broker.Endpoints;
+
         public override string ToString()
         {
             return $"{nameof(MessageId)}: {MessageId}";
         }
 
-        public static List<SendReceipt> ProcessSendMessageResponse(Proto.SendMessageResponse response)
+        public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq,
+            Proto.SendMessageResponse response)
         {
             var status = response.Status;
             foreach (var entry in response.Entries)
@@ -48,7 +57,7 @@ namespace Org.Apache.Rocketmq
 
             // May throw exception.
             StatusChecker.Check(status, response);
-            return response.Entries.Select(entry => new SendReceipt(entry.MessageId)).ToList();
+            return response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs
new file mode 100644
index 00000000..e44c0675
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Transaction.cs
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Org.Apache.Rocketmq
+{
+    public class Transaction : ITransaction
+    {
+        private const int MaxMessageNum = 1;
+
+        private readonly Producer _producer;
+        private readonly HashSet<PublishingMessage> _messages;
+        private readonly ReaderWriterLockSlim _messagesLock;
+        private readonly ConcurrentDictionary<PublishingMessage, SendReceipt> _messageSendReceiptDict;
+
+        public Transaction(Producer producer)
+        {
+            _producer = producer;
+            _messages = new HashSet<PublishingMessage>();
+            _messagesLock = new ReaderWriterLockSlim();
+            _messageSendReceiptDict = new ConcurrentDictionary<PublishingMessage, SendReceipt>();
+        }
+
+        public PublishingMessage TryAddMessage(Message message)
+        {
+            _messagesLock.EnterReadLock();
+            try
+            {
+                if (_messages.Count > MaxMessageNum)
+                {
+                    throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
+                }
+            }
+            finally
+            {
+                _messagesLock.ExitReadLock();
+            }
+
+            _messagesLock.EnterWriteLock();
+            try
+            {
+                if (_messages.Count > MaxMessageNum)
+                {
+                    throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
+                }
+
+                var publishingMessage = new PublishingMessage(message, _producer.PublishingSettings, true);
+                _messages.Add(publishingMessage);
+                return publishingMessage;
+            }
+            finally
+            {
+                _messagesLock.ExitWriteLock();
+            }
+        }
+
+        public void TryAddReceipt(PublishingMessage publishingMessage, SendReceipt sendReceipt)
+        {
+            _messagesLock.EnterReadLock();
+            try
+            {
+                if (!_messages.Contains(publishingMessage))
+                {
+                    throw new ArgumentException("Message is not in the transaction");
+                }
+
+                _messageSendReceiptDict[publishingMessage] = sendReceipt;
+            }
+            finally
+            {
+                _messagesLock.ExitReadLock();
+            }
+        }
+
+        public async void commit()
+        {
+            if (_messageSendReceiptDict.IsEmpty)
+            {
+                throw new ArgumentException("Transactional message has not been sent yet");
+            }
+
+            foreach (var (publishingMessage, sendReceipt) in _messageSendReceiptDict)
+            {
+                await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, sendReceipt.MessageId,
+                    sendReceipt.TransactionId, TransactionResolution.COMMIT);
+            }
+        }
+
+        public async void rollback()
+        {
+            if (_messageSendReceiptDict.IsEmpty)
+            {
+                throw new ArgumentException("Transaction message has not been sent yet");
+            }
+
+            foreach (var (publishingMessage, sendReceipt) in _messageSendReceiptDict)
+            {
+                await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, sendReceipt.MessageId,
+                    sendReceipt.TransactionId, TransactionResolution.ROLLBACK);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/TransactionResolution.cs
similarity index 71%
copy from csharp/tests/SendResultTest.cs
copy to csharp/rocketmq-client-csharp/TransactionResolution.cs
index fae7a7bb..5bb4d5e1 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/rocketmq-client-csharp/TransactionResolution.cs
@@ -15,19 +15,12 @@
  * limitations under the License.
  */
 
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-
 namespace Org.Apache.Rocketmq
 {
-    [TestClass]
-    public class SendResultTest
+    public enum TransactionResolution
     {
-        [TestMethod]
-        public void testCtor()
-        {
-            string messageId = new string("abc");
-            var sendResult = new SendReceipt(messageId);
-            Assert.AreEqual(messageId, sendResult.MessageId);
-        }
+        COMMIT,
+        ROLLBACK,
+        UNKNOWN
     }
 }
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/tests/SendResultTest.cs
index fae7a7bb..262410da 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/tests/SendResultTest.cs
@@ -25,9 +25,9 @@ namespace Org.Apache.Rocketmq
         [TestMethod]
         public void testCtor()
         {
-            string messageId = new string("abc");
-            var sendResult = new SendReceipt(messageId);
-            Assert.AreEqual(messageId, sendResult.MessageId);
+            // string messageId = new string("abc");
+            // var sendResult = new SendReceipt(messageId);
+            // Assert.AreEqual(messageId, sendResult.MessageId);
         }
     }
 }
\ No newline at end of file