You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:15:05 UTC

[rocketmq-clients] 25/28: Apply state machine in transactional message

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 15007046eae023bf6320ec1605fbb3f02845d589
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Feb 22 18:44:31 2023 +0800

    Apply state machine in transactional message
---
 csharp/examples/ProducerTransactionMessageExample.cs |  2 +-
 csharp/rocketmq-client-csharp/Client.cs              |  2 +-
 csharp/rocketmq-client-csharp/ITransaction.cs        |  4 ++--
 csharp/rocketmq-client-csharp/Producer.cs            | 13 +++++++++++--
 csharp/rocketmq-client-csharp/SimpleConsumer.cs      | 15 +++++++++++++++
 csharp/rocketmq-client-csharp/Transaction.cs         | 14 ++++++++++++--
 6 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs
index edc4d41f..8b331722 100644
--- a/csharp/examples/ProducerTransactionMessageExample.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -73,7 +73,7 @@ namespace examples
             var sendReceipt = await producer.Send(message, transaction);
             Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
             // Commit the transaction.
-            transaction.commit();
+            transaction.Commit();
             // Or rollback the transaction.
             // transaction.rollback();
             // Close the producer if you don't need it anymore.
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index a9cd093a..305383e2 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -58,7 +58,7 @@ namespace Org.Apache.Rocketmq
         private readonly Dictionary<Endpoints, Session> _sessionsTable;
         private readonly ReaderWriterLockSlim _sessionLock;
 
-        protected volatile State State;
+        internal volatile State State;
 
         protected Client(ClientConfig clientConfig)
         {
diff --git a/csharp/rocketmq-client-csharp/ITransaction.cs b/csharp/rocketmq-client-csharp/ITransaction.cs
index b9898de0..27c770b1 100644
--- a/csharp/rocketmq-client-csharp/ITransaction.cs
+++ b/csharp/rocketmq-client-csharp/ITransaction.cs
@@ -19,8 +19,8 @@ namespace Org.Apache.Rocketmq
 {
     public interface ITransaction
     {
-        void commit();
+        void Commit();
 
-        void rollback();
+        void Rollback();
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index d376d14c..838263a2 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -179,12 +179,20 @@ namespace Org.Apache.Rocketmq
 
         public async Task<SendReceipt> Send(Message message)
         {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Producer is not running");
+            }
             var sendReceipt = await Send(message, false);
             return sendReceipt;
         }
 
         public async Task<SendReceipt> Send(Message message, ITransaction transaction)
         {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Producer is not running");
+            }
             var tx = (Transaction)transaction;
             var publishingMessage = tx.TryAddMessage(message);
             var sendReceipt = await Send(message, true);
@@ -223,8 +231,9 @@ namespace Org.Apache.Rocketmq
                 var sendReceipt = sendReceipts.First();
                 if (attempt > 1)
                 {
-                    Logger.Info($"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
-                                $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+                    Logger.Info(
+                        $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
+                        $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
                 }
 
                 return sendReceipt;
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index bf9614e1..d25dceab 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -157,6 +157,11 @@ namespace Org.Apache.Rocketmq
 
         public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration)
         {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Simple consumer is not running");
+            }
+
             if (maxMessageNum <= 0)
             {
                 throw new InternalErrorException("maxMessageNum must be greater than 0");
@@ -182,6 +187,11 @@ namespace Org.Apache.Rocketmq
 
         public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
         {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Simple consumer is not running");
+            }
+
             var request = WrapChangeInvisibleDuration(messageView, invisibleDuration);
             var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
                 request, ClientConfig.RequestTimeout);
@@ -191,6 +201,11 @@ namespace Org.Apache.Rocketmq
 
         public async Task Ack(MessageView messageView)
         {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Simple consumer is not running");
+            }
+
             var request = WrapAckMessageRequest(messageView);
             var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
                 ClientConfig.RequestTimeout);
diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs
index e44c0675..5084ae4e 100644
--- a/csharp/rocketmq-client-csharp/Transaction.cs
+++ b/csharp/rocketmq-client-csharp/Transaction.cs
@@ -90,8 +90,13 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public async void commit()
+        public async void Commit()
         {
+            if (State.Running != _producer.State)
+            {
+                throw new InvalidOperationException("Producer is not running");
+            }
+            
             if (_messageSendReceiptDict.IsEmpty)
             {
                 throw new ArgumentException("Transactional message has not been sent yet");
@@ -104,8 +109,13 @@ namespace Org.Apache.Rocketmq
             }
         }
 
-        public async void rollback()
+        public async void Rollback()
         {
+            if (State.Running != _producer.State)
+            {
+                throw new InvalidOperationException("Producer is not running");
+            }
+            
             if (_messageSendReceiptDict.IsEmpty)
             {
                 throw new ArgumentException("Transaction message has not been sent yet");