You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:15:04 UTC

[rocketmq-clients] 24/28: Add state machine for rocketmq producer/simpleConsumer

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit f5c2878af5c8e542148f81180c35ce47735d23a5
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Feb 22 17:52:56 2023 +0800

    Add state machine for rocketmq producer/simpleConsumer
---
 csharp/rocketmq-client-csharp/Client.cs         |  4 +++
 csharp/rocketmq-client-csharp/Producer.cs       | 34 +++++++++++++++----
 csharp/rocketmq-client-csharp/SimpleConsumer.cs | 45 ++++++++++++++++++++-----
 csharp/rocketmq-client-csharp/State.cs          |  7 +++-
 4 files changed, 74 insertions(+), 16 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index e0a4c553..a9cd093a 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -58,6 +58,8 @@ namespace Org.Apache.Rocketmq
         private readonly Dictionary<Endpoints, Session> _sessionsTable;
         private readonly ReaderWriterLockSlim _sessionLock;
 
+        protected volatile State State;
+
         protected Client(ClientConfig clientConfig)
         {
             ClientConfig = clientConfig;
@@ -75,6 +77,8 @@ namespace Org.Apache.Rocketmq
 
             _sessionsTable = new Dictionary<Endpoints, Session>();
             _sessionLock = new ReaderWriterLockSlim();
+
+            State = State.New;
         }
 
         public virtual async Task Start()
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index c4e78e91..d376d14c 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -73,16 +73,36 @@ namespace Org.Apache.Rocketmq
 
         public override async Task Start()
         {
-            Logger.Info($"Begin to start the rocketmq producer, clientId={ClientId}");
-            await base.Start();
-            Logger.Info($"The rocketmq producer starts successfully, clientId={ClientId}");
+            try
+            {
+                State = State.Starting;
+                Logger.Info($"Begin to start the rocketmq producer, clientId={ClientId}");
+                await base.Start();
+                Logger.Info($"The rocketmq producer starts successfully, clientId={ClientId}");
+                State = State.Running;
+            }
+            catch (Exception)
+            {
+                State = State.Failed;
+                throw;
+            }
         }
 
         public override async Task Shutdown()
         {
-            Logger.Info($"Begin to shutdown the rocketmq producer, clientId={ClientId}");
-            await base.Shutdown();
-            Logger.Info($"Shutdown the rocketmq producer successfully, clientId={ClientId}");
+            try
+            {
+                State = State.Stopping;
+                Logger.Info($"Begin to shutdown the rocketmq producer, clientId={ClientId}");
+                await base.Shutdown();
+                Logger.Info($"Shutdown the rocketmq producer successfully, clientId={ClientId}");
+                State = State.Terminated;
+            }
+            catch (Exception)
+            {
+                State = State.Failed;
+                throw;
+            }
         }
 
         protected override Proto::HeartbeatRequest WrapHeartbeatRequest()
@@ -165,7 +185,7 @@ namespace Org.Apache.Rocketmq
 
         public async Task<SendReceipt> Send(Message message, ITransaction transaction)
         {
-            var tx = (Transaction) transaction;
+            var tx = (Transaction)transaction;
             var publishingMessage = tx.TryAddMessage(message);
             var sendReceipt = await Send(message, true);
             tx.TryAddReceipt(publishingMessage, sendReceipt);
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index f6abd5ea..bf9614e1 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -55,30 +55,59 @@ namespace Org.Apache.Rocketmq
 
         public async Task Subscribe(string topic, FilterExpression filterExpression)
         {
-            // TODO: check running status.
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Simple consumer is not running");
+            }
+
             await GetSubscriptionLoadBalancer(topic);
             _subscriptionExpressions.TryAdd(topic, filterExpression);
         }
 
         public void Unsubscribe(string topic)
         {
+            if (State.Running != State)
+            {
+                throw new InvalidOperationException("Simple consumer is not running");
+            }
+
             _subscriptionExpressions.TryRemove(topic, out _);
         }
 
         public override async Task Start()
         {
-            Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}");
-            await base.Start();
-            Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+            try
+            {
+                State = State.Starting;
+                Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}");
+                await base.Start();
+                Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+                State = State.Running;
+            }
+            catch (Exception)
+            {
+                State = State.Failed;
+                throw;
+            }
         }
 
         public override async Task Shutdown()
         {
-            Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}");
-            await base.Shutdown();
-            Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+            try
+            {
+                State = State.Stopping;
+                Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}");
+                await base.Shutdown();
+                Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+                State = State.Terminated;
+            }
+            catch (Exception)
+            {
+                State = State.Failed;
+                throw;
+            }
         }
-        
+
         protected override IEnumerable<string> GetTopics()
         {
             return _subscriptionExpressions.Keys;
diff --git a/csharp/rocketmq-client-csharp/State.cs b/csharp/rocketmq-client-csharp/State.cs
index 1dbd6b30..e353df50 100644
--- a/csharp/rocketmq-client-csharp/State.cs
+++ b/csharp/rocketmq-client-csharp/State.cs
@@ -19,6 +19,11 @@ namespace Org.Apache.Rocketmq
 {
     public enum State
     {
-        
+        New,
+        Starting,
+        Running,
+        Stopping,
+        Terminated,
+        Failed
     }
 }
\ No newline at end of file