You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:15:05 UTC
[rocketmq-clients] 25/28: Apply state machine in transactional message
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 15007046eae023bf6320ec1605fbb3f02845d589
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Feb 22 18:44:31 2023 +0800
Apply state machine in transactional message
---
csharp/examples/ProducerTransactionMessageExample.cs | 2 +-
csharp/rocketmq-client-csharp/Client.cs | 2 +-
csharp/rocketmq-client-csharp/ITransaction.cs | 4 ++--
csharp/rocketmq-client-csharp/Producer.cs | 13 +++++++++++--
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 15 +++++++++++++++
csharp/rocketmq-client-csharp/Transaction.cs | 14 ++++++++++++--
6 files changed, 42 insertions(+), 8 deletions(-)
diff --git a/csharp/examples/ProducerTransactionMessageExample.cs b/csharp/examples/ProducerTransactionMessageExample.cs
index edc4d41f..8b331722 100644
--- a/csharp/examples/ProducerTransactionMessageExample.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -73,7 +73,7 @@ namespace examples
var sendReceipt = await producer.Send(message, transaction);
Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
// Commit the transaction.
- transaction.commit();
+ transaction.Commit();
// Or rollback the transaction.
// transaction.rollback();
// Close the producer if you don't need it anymore.
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index a9cd093a..305383e2 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -58,7 +58,7 @@ namespace Org.Apache.Rocketmq
private readonly Dictionary<Endpoints, Session> _sessionsTable;
private readonly ReaderWriterLockSlim _sessionLock;
- protected volatile State State;
+ internal volatile State State;
protected Client(ClientConfig clientConfig)
{
diff --git a/csharp/rocketmq-client-csharp/ITransaction.cs b/csharp/rocketmq-client-csharp/ITransaction.cs
index b9898de0..27c770b1 100644
--- a/csharp/rocketmq-client-csharp/ITransaction.cs
+++ b/csharp/rocketmq-client-csharp/ITransaction.cs
@@ -19,8 +19,8 @@ namespace Org.Apache.Rocketmq
{
public interface ITransaction
{
- void commit();
+ void Commit();
- void rollback();
+ void Rollback();
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index d376d14c..838263a2 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -179,12 +179,20 @@ namespace Org.Apache.Rocketmq
public async Task<SendReceipt> Send(Message message)
{
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Producer is not running");
+ }
var sendReceipt = await Send(message, false);
return sendReceipt;
}
public async Task<SendReceipt> Send(Message message, ITransaction transaction)
{
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Producer is not running");
+ }
var tx = (Transaction)transaction;
var publishingMessage = tx.TryAddMessage(message);
var sendReceipt = await Send(message, true);
@@ -223,8 +231,9 @@ namespace Org.Apache.Rocketmq
var sendReceipt = sendReceipts.First();
if (attempt > 1)
{
- Logger.Info($"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
- $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+ Logger.Info(
+ $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
+ $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
}
return sendReceipt;
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index bf9614e1..d25dceab 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -157,6 +157,11 @@ namespace Org.Apache.Rocketmq
public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisibleDuration)
{
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Simple consumer is not running");
+ }
+
if (maxMessageNum <= 0)
{
throw new InternalErrorException("maxMessageNum must be greater than 0");
@@ -182,6 +187,11 @@ namespace Org.Apache.Rocketmq
public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
{
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Simple consumer is not running");
+ }
+
var request = WrapChangeInvisibleDuration(messageView, invisibleDuration);
var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
request, ClientConfig.RequestTimeout);
@@ -191,6 +201,11 @@ namespace Org.Apache.Rocketmq
public async Task Ack(MessageView messageView)
{
+ if (State.Running != State)
+ {
+ throw new InvalidOperationException("Simple consumer is not running");
+ }
+
var request = WrapAckMessageRequest(messageView);
var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
ClientConfig.RequestTimeout);
diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs
index e44c0675..5084ae4e 100644
--- a/csharp/rocketmq-client-csharp/Transaction.cs
+++ b/csharp/rocketmq-client-csharp/Transaction.cs
@@ -90,8 +90,13 @@ namespace Org.Apache.Rocketmq
}
}
- public async void commit()
+ public async void Commit()
{
+ if (State.Running != _producer.State)
+ {
+ throw new InvalidOperationException("Producer is not running");
+ }
+
if (_messageSendReceiptDict.IsEmpty)
{
throw new ArgumentException("Transactional message has not been sent yet");
@@ -104,8 +109,13 @@ namespace Org.Apache.Rocketmq
}
}
- public async void rollback()
+ public async void Rollback()
{
+ if (State.Running != _producer.State)
+ {
+ throw new InvalidOperationException("Producer is not running");
+ }
+
if (_messageSendReceiptDict.IsEmpty)
{
throw new ArgumentException("Transaction message has not been sent yet");