You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:15:04 UTC
[rocketmq-clients] 24/28: Add state machine for rocketmq producer/simpleConsumer
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f5c2878af5c8e542148f81180c35ce47735d23a5
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Feb 22 17:52:56 2023 +0800
Add state machine for rocketmq producer/simpleConsumer
---
csharp/rocketmq-client-csharp/Client.cs | 4 +++
csharp/rocketmq-client-csharp/Producer.cs | 34 +++++++++++++++----
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 45 ++++++++++++++++++++-----
csharp/rocketmq-client-csharp/State.cs | 7 +++-
4 files changed, 74 insertions(+), 16 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index e0a4c553..a9cd093a 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -58,6 +58,8 @@ namespace Org.Apache.Rocketmq
private readonly Dictionary<Endpoints, Session> _sessionsTable;
private readonly ReaderWriterLockSlim _sessionLock;
+ protected volatile State State;
+
protected Client(ClientConfig clientConfig)
{
ClientConfig = clientConfig;
@@ -75,6 +77,8 @@ namespace Org.Apache.Rocketmq
_sessionsTable = new Dictionary<Endpoints, Session>();
_sessionLock = new ReaderWriterLockSlim();
+
+ State = State.New;
}
public virtual async Task Start()
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index c4e78e91..d376d14c 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -73,16 +73,36 @@ namespace Org.Apache.Rocketmq
public override async Task Start()
{
- Logger.Info($"Begin to start the rocketmq producer, clientId={ClientId}");
- await base.Start();
- Logger.Info($"The rocketmq producer starts successfully, clientId={ClientId}");
+ try
+ {
+ State = State.Starting;
+ Logger.Info($"Begin to start the rocketmq producer, clientId={ClientId}");
+ await base.Start();
+ Logger.Info($"The rocketmq producer starts successfully, clientId={ClientId}");
+ State = State.Running;
+ }
+ catch (Exception)
+ {
+ State = State.Failed;
+ throw;
+ }
}
public override async Task Shutdown()
{
- Logger.Info($"Begin to shutdown the rocketmq producer, clientId={ClientId}");
- await base.Shutdown();
- Logger.Info($"Shutdown the rocketmq producer successfully, clientId={ClientId}");
+ try
+ {
+ State = State.Stopping;
+ Logger.Info($"Begin to shutdown the rocketmq producer, clientId={ClientId}");
+ await base.Shutdown();
+ Logger.Info($"Shutdown the rocketmq producer successfully, clientId={ClientId}");
+ State = State.Terminated;
+ }
+ catch (Exception)
+ {
+ State = State.Failed;
+ throw;
+ }
}
protected override Proto::HeartbeatRequest WrapHeartbeatRequest()
@@ -165,7 +185,7 @@ namespace Org.Apache.Rocketmq
public async Task<SendReceipt> Send(Message message, ITransaction transaction)
{
- var tx = (Transaction) transaction;
+ var tx = (Transaction)transaction;
var publishingMessage = tx.TryAddMessage(message);
var sendReceipt = await Send(message, true);
tx.TryAddReceipt(publishingMessage, sendReceipt);
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index f6abd5ea..bf9614e1 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -55,30 +55,59 @@ namespace Org.Apache.Rocketmq
public async Task Subscribe(string topic, FilterExpression filterExpression)
{
- // TODO: check running status.
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Simple consumer is not running");
+ }
+
await GetSubscriptionLoadBalancer(topic);
_subscriptionExpressions.TryAdd(topic, filterExpression);
}
public void Unsubscribe(string topic)
{
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Simple consumer is not running");
+ }
+
_subscriptionExpressions.TryRemove(topic, out _);
}
public override async Task Start()
{
- Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}");
- await base.Start();
- Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+ try
+ {
+ State = State.Starting;
+ Logger.Info($"Begin to start the rocketmq simple consumer, clientId={ClientId}");
+ await base.Start();
+ Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+ State = State.Running;
+ }
+ catch (Exception)
+ {
+ State = State.Failed;
+ throw;
+ }
}
public override async Task Shutdown()
{
- Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}");
- await base.Shutdown();
- Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+ try
+ {
+ State = State.Stopping;
+ Logger.Info($"Begin to shutdown the rocketmq simple consumer, clientId={ClientId}");
+ await base.Shutdown();
+ Logger.Info($"The rocketmq simple consumer starts successfully, clientId={ClientId}");
+ State = State.Terminated;
+ }
+ catch (Exception)
+ {
+ State = State.Failed;
+ throw;
+ }
}
-
+
protected override IEnumerable<string> GetTopics()
{
return _subscriptionExpressions.Keys;
diff --git a/csharp/rocketmq-client-csharp/State.cs b/csharp/rocketmq-client-csharp/State.cs
index 1dbd6b30..e353df50 100644
--- a/csharp/rocketmq-client-csharp/State.cs
+++ b/csharp/rocketmq-client-csharp/State.cs
@@ -19,6 +19,11 @@ namespace Org.Apache.Rocketmq
{
public enum State
{
-
+ New,
+ Starting,
+ Running,
+ Stopping,
+ Terminated,
+ Failed
}
}
\ No newline at end of file