You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:55 UTC
[rocketmq-clients] 15/28: Implement transaction message
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 7de5b3957cf7d32dcff0edee61d6ed0710febb56
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Feb 16 13:51:02 2023 +0800
Implement transaction message
---
csharp/examples/ProducerBenchmark.cs | 2 +-
csharp/examples/ProducerFifoMessageExample.cs | 81 +++++++-------
...ark.cs => ProducerTransactionMessageExample.cs} | 71 +++++-------
csharp/examples/QuickStart.cs | 4 +-
csharp/rocketmq-client-csharp/Client.cs | 18 ++-
csharp/rocketmq-client-csharp/ClientManager.cs | 6 +
csharp/rocketmq-client-csharp/IClientManager.cs | 3 +
.../ITransaction.cs} | 15 +--
.../ITransactionChecker.cs} | 19 ++--
csharp/rocketmq-client-csharp/MessageView.cs | 5 +
csharp/rocketmq-client-csharp/Producer.cs | 78 +++++++++++--
csharp/rocketmq-client-csharp/SendReceipt.cs | 15 ++-
csharp/rocketmq-client-csharp/Transaction.cs | 121 +++++++++++++++++++++
.../TransactionResolution.cs} | 15 +--
csharp/tests/SendResultTest.cs | 6 +-
15 files changed, 322 insertions(+), 137 deletions(-)
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerBenchmark.cs
index 4e334104..3918666d 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerBenchmark.cs
@@ -68,7 +68,7 @@ namespace examples
Keys = keys
};
- const int tpsLimit = 800;
+ const int tpsLimit = 500;
Task.Run(async () =>
{
diff --git a/csharp/examples/ProducerFifoMessageExample.cs b/csharp/examples/ProducerFifoMessageExample.cs
index d9a72a59..87f953c3 100644
--- a/csharp/examples/ProducerFifoMessageExample.cs
+++ b/csharp/examples/ProducerFifoMessageExample.cs
@@ -17,55 +17,60 @@
using System.Collections.Generic;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using NLog;
using Org.Apache.Rocketmq;
namespace examples
{
- static class ProducerFifoMessageExample
+ internal static class ProducerFifoMessageExample
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
internal static async Task QuickStart()
{
- // string accessKey = "yourAccessKey";
- // string secretKey = "yourSecretKey";
- // // Credential provider is optional for client configuration.
- // var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- // string endpoints = "foobar.com:8080";
- // // In most case, you don't need to create too many producers, single pattern is recommended.
- // var producer = new Producer(endpoints)
- // {
- // CredentialsProvider = credentialsProvider
- // };
- // string topic = "yourFifoTopic";
- // // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
- // // the topic route before message publishing.
- // producer.AddTopicOfInterest(topic);
- //
- // await producer.Start();
- // // Define your message body.
- // byte[] bytes = Encoding.UTF8.GetBytes("foobar");
- // string tag = "yourMessageTagA";
- // // You could set multiple keys for the single message.
- // var keys = new List<string>
- // {
- // "yourMessageKey-6cc8b65ed1c8",
- // "yourMessageKey-43783375d9a5"
- // };
- // // Set topic for current message.
- // var message = new Message(topic, bytes)
- // {
- // Tag = tag,
- // Keys = keys,
- // // Essential for FIFO message, messages that belongs to the same message group follow the FIFO semantics.
- // MessageGroup = "yourMessageGroup0"
- // };
- // var sendReceipt = await producer.Send(message);
- // Logger.Info($"Send FIFO message successfully, sendReceipt={sendReceipt}.");
- // // Close the producer if you don't need it anymore.
- // await producer.Shutdown();
+ const string accessKey = "5jFk0wK7OU6Uq395";
+ const string secretKey = "V1u8z19URHs4o6RQ";
+
+ // Credential provider is optional for client configuration.
+ var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
+ const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
+ var clientConfig = new ClientConfig(endpoints)
+ {
+ CredentialsProvider = credentialsProvider
+ };
+ // In most case, you don't need to create too many producers, single pattern is recommended.
+ var producer = new Producer(clientConfig);
+
+ const string topic = "lingchu_fifo_topic";
+ producer.SetTopics(topic);
+ // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
+ // the topic route before message publishing.
+ await producer.Start();
+ // Define your message body.
+ var bytes = Encoding.UTF8.GetBytes("foobar");
+ const string tag = "yourMessageTagA";
+ // You could set multiple keys for the single message.
+ var keys = new List<string>
+ {
+ "yourMessageKey-7044358f98fc",
+ "yourMessageKey-f72539fbc246"
+ };
+ const string messageGroup = "yourMessageGroup";
+ // Set topic for current message.
+ var message = new Message(topic, bytes)
+ {
+ Tag = tag,
+ Keys = keys,
+ // Set message group for FIFO message.
+ MessageGroup = messageGroup
+ };
+ var sendReceipt = await producer.Send(message);
+ Logger.Info($"Send message successfully, sendReceipt={sendReceipt}");
+ Thread.Sleep(9999999);
+ // Close the producer if you don't need it anymore.
+ await producer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/examples/ProducerBenchmark.cs b/csharp/examples/ProducerTransactionMessageExample.cs
similarity index 58%
copy from csharp/examples/ProducerBenchmark.cs
copy to csharp/examples/ProducerTransactionMessageExample.cs
index 4e334104..edc4d41f 100644
--- a/csharp/examples/ProducerBenchmark.cs
+++ b/csharp/examples/ProducerTransactionMessageExample.cs
@@ -15,31 +15,33 @@
* limitations under the License.
*/
-using System;
using System.Collections.Generic;
using System.Text;
-using System.Threading;
using System.Threading.Tasks;
using NLog;
using Org.Apache.Rocketmq;
namespace examples
{
- public class ProducerBenchmark
+ internal static class ProducerTransactionMessageExample
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
- private static long _counter = 0;
-
- internal static void QuickStart()
+ private class TransactionChecker : ITransactionChecker
{
- const string accessKey = "amKhwEM40L61znSz";
- const string secretKey = "bT6c3gpF3EFB10F3";
+ public TransactionResolution Check(MessageView messageView)
+ {
+ return TransactionResolution.COMMIT;
+ }
+ }
+ internal static async Task QuickStart()
+ {
+ const string accessKey = "yourAccessKey";
+ const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticCredentialsProvider(accessKey, secretKey);
- const string endpoints = "rmq-cn-nwy337bf81g.cn-hangzhou.rmq.aliyuncs.com:8080";
+ const string endpoints = "rmq-cn-7mz30qjc71a.cn-hangzhou.rmq.aliyuncs.com:8080";
var clientConfig = new ClientConfig(endpoints)
{
CredentialsProvider = credentialsProvider
@@ -47,11 +49,12 @@ namespace examples
// In most case, you don't need to create too many producers, single pattern is recommended.
var producer = new Producer(clientConfig);
- const string topic = "lingchu_normal_topic";
+ const string topic = "lingchu_transactional_topic";
producer.SetTopics(topic);
- // Set the topic name(s), which is optional but recommended. It makes producer could prefetch
- // the topic route before message publishing.
- producer.Start().Wait();
+ producer.SetTransactionChecker(new TransactionChecker());
+
+ await producer.Start();
+ var transaction = producer.BeginTransaction();
// Define your message body.
var bytes = Encoding.UTF8.GetBytes("foobar");
const string tag = "yourMessageTagA";
@@ -67,38 +70,14 @@ namespace examples
Tag = tag,
Keys = keys
};
-
- const int tpsLimit = 800;
-
- Task.Run(async () =>
- {
- while (true)
- {
- _semaphore.Release(tpsLimit);
- await Task.Delay(TimeSpan.FromMilliseconds(1000));
- }
- });
-
- Task.Run(async () =>
- {
- while (true)
- {
- Logger.Info($"Send {_counter} messages successfully.");
- Interlocked.Exchange(ref _counter, 0);
- await Task.Delay(TimeSpan.FromSeconds(1));
- }
- });
-
- var tasks = new List<Task>();
- while (true)
- {
- _semaphore.Wait();
- Interlocked.Increment(ref _counter);
- var task = producer.Send(message);
- tasks.Add(task);
- }
-
- Task.WhenAll(tasks).Wait();
+ var sendReceipt = await producer.Send(message, transaction);
+ Logger.Info("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
+ // Commit the transaction.
+ transaction.commit();
+ // Or rollback the transaction.
+ // transaction.rollback();
+ // Close the producer if you don't need it anymore.
+ await producer.Shutdown();
}
}
}
\ No newline at end of file
diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs
index 474d6063..8323218f 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/examples/QuickStart.cs
@@ -27,11 +27,11 @@ namespace examples
{
// Console.WriteLine(MetadataConstants.Instance.ClientVersion);
- ProducerNormalMessageExample.QuickStart().Wait();
+ // ProducerNormalMessageExample.QuickStart().Wait();
// await ProducerFifoMessageExample.QuickStart();
// await ProducerDelayMessageExample.QuickStart();
// await SimpleConsumerExample.QuickStart();
- // ProducerBenchmark.QuickStart();
+ ProducerBenchmark.QuickStart();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index fc6871b9..63c24c91 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -389,10 +389,22 @@ namespace Org.Apache.Rocketmq
return ClientConfig;
}
- public void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
+ public virtual async void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
Proto.RecoverOrphanedTransactionCommand command)
{
- // TODO
+ Logger.Warn($"Ignore orphaned transaction recovery command from remote, which is not expected, " +
+ $"clientId={ClientId}, endpoints={endpoints}");
+ var status = new Proto.Status
+ {
+ Code = Proto.Code.InternalError,
+ Message = "Current client don't support transaction message recovery"
+ };
+ var telemetryCommand = new Proto.TelemetryCommand
+ {
+ Status = status
+ };
+ var (_, session) = GetSession(endpoints);
+ await session.write(telemetryCommand);
}
public async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
@@ -405,7 +417,7 @@ namespace Org.Apache.Rocketmq
Code = Proto.Code.Unsupported,
Message = "Message consumption verification is not supported"
};
- var telemetryCommand = new Proto.TelemetryCommand()
+ var telemetryCommand = new Proto.TelemetryCommand
{
Status = status
};
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index 967a2ac1..f464d461 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -146,5 +146,11 @@ namespace Org.Apache.Rocketmq
{
return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout);
}
+
+ public async Task<Proto.EndTransactionResponse> EndTransaction(Endpoints endpoints,
+ Proto.EndTransactionRequest request, TimeSpan timeout)
+ {
+ return await GetRpcClient(endpoints).EndTransaction(_client.Sign(), request, timeout);
+ }
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index df2035ab..2082584c 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -81,6 +81,9 @@ namespace Org.Apache.Rocketmq
Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
ChangeInvisibleDurationRequest request, TimeSpan timeout);
+ Task<EndTransactionResponse> EndTransaction(Endpoints endpoints, EndTransactionRequest request,
+ TimeSpan timeout);
+
Task Shutdown();
}
}
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/ITransaction.cs
similarity index 71%
copy from csharp/tests/SendResultTest.cs
copy to csharp/rocketmq-client-csharp/ITransaction.cs
index fae7a7bb..b9898de0 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/rocketmq-client-csharp/ITransaction.cs
@@ -15,19 +15,12 @@
* limitations under the License.
*/
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-
namespace Org.Apache.Rocketmq
{
- [TestClass]
- public class SendResultTest
+ public interface ITransaction
{
- [TestMethod]
- public void testCtor()
- {
- string messageId = new string("abc");
- var sendResult = new SendReceipt(messageId);
- Assert.AreEqual(messageId, sendResult.MessageId);
- }
+ void commit();
+
+ void rollback();
}
}
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/ITransactionChecker.cs
similarity index 67%
copy from csharp/tests/SendResultTest.cs
copy to csharp/rocketmq-client-csharp/ITransactionChecker.cs
index fae7a7bb..f03350b1 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/rocketmq-client-csharp/ITransactionChecker.cs
@@ -15,19 +15,16 @@
* limitations under the License.
*/
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-
namespace Org.Apache.Rocketmq
{
- [TestClass]
- public class SendResultTest
+ public interface ITransactionChecker
{
- [TestMethod]
- public void testCtor()
- {
- string messageId = new string("abc");
- var sendResult = new SendReceipt(messageId);
- Assert.AreEqual(messageId, sendResult.MessageId);
- }
+ /// <summary>
+ /// Interface that implement this interface will be able to check transactions and
+ /// return a TransactionResolution object representing the result of the check.
+ /// </summary>
+ /// <param name="messageView"></param>
+ /// <returns></returns>
+ TransactionResolution Check(MessageView messageView);
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index dfb45e45..fd095819 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -80,6 +80,11 @@ namespace Org.Apache.Rocketmq
public int DeliveryAttempt { get; }
+ public static MessageView FromProtobuf(Proto.Message message)
+ {
+ return FromProtobuf(message, null);
+ }
+
public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue)
{
var topic = message.Topic.Name;
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 4bbc4bfa..23041e4c 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -29,9 +29,9 @@ namespace Org.Apache.Rocketmq
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
- private readonly PublishingSettings _publishingSettings;
+ internal readonly PublishingSettings PublishingSettings;
private readonly ConcurrentDictionary<string, bool> _publishingTopics;
-
+ private ITransactionChecker _checker = null;
public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3)
{
@@ -47,7 +47,7 @@ namespace Org.Apache.Rocketmq
base(clientConfig)
{
var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
- _publishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
+ PublishingSettings = new PublishingSettings(ClientId, clientConfig.Endpoints, retryPolicy,
clientConfig.RequestTimeout, publishingTopics);
_publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
_publishingTopics = publishingTopics;
@@ -61,6 +61,11 @@ namespace Org.Apache.Rocketmq
}
}
+ public void SetTransactionChecker(ITransactionChecker checker)
+ {
+ _checker = checker;
+ }
+
protected override IEnumerable<string> GetTopics()
{
return _publishingTopics.Keys;
@@ -115,13 +120,13 @@ namespace Org.Apache.Rocketmq
private IRetryPolicy GetRetryPolicy()
{
- return _publishingSettings.GetRetryPolicy();
+ return PublishingSettings.GetRetryPolicy();
}
public async Task<SendReceipt> Send(Message message)
{
var publishingLoadBalancer = await GetPublishingLoadBalancer(message.Topic);
- var publishingMessage = new PublishingMessage(message, _publishingSettings, false);
+ var publishingMessage = new PublishingMessage(message, PublishingSettings, false);
var retryPolicy = GetRetryPolicy();
var maxAttempts = retryPolicy.GetMaxAttempts();
@@ -147,6 +152,12 @@ namespace Org.Apache.Rocketmq
throw exception!;
}
+ public async Task<SendReceipt> Send(Message message, Transaction transaction)
+ {
+ // TODO
+ return null;
+ }
+
private static Proto.SendMessageRequest WrapSendMessageRequest(PublishingMessage message, MessageQueue mq)
{
return new Proto.SendMessageRequest
@@ -160,7 +171,7 @@ namespace Org.Apache.Rocketmq
{
var candidateIndex = (attempt - 1) % candidates.Count;
var mq = candidates[candidateIndex];
- if (_publishingSettings.IsValidateMessageType() &&
+ if (PublishingSettings.IsValidateMessageType() &&
!mq.AcceptMessageTypes.Contains(message.MessageType))
{
throw new ArgumentException("Current message type does not match with the accept message types," +
@@ -174,7 +185,7 @@ namespace Org.Apache.Rocketmq
await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
try
{
- var sendReceipts = SendReceipt.ProcessSendMessageResponse(response);
+ var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, response);
var sendReceipt = sendReceipts.First();
if (attempt > 1)
@@ -206,7 +217,58 @@ namespace Org.Apache.Rocketmq
public override Settings GetSettings()
{
- return _publishingSettings;
+ return PublishingSettings;
+ }
+
+ public override async void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
+ Proto.RecoverOrphanedTransactionCommand command)
+ {
+ var messageId = command.Message.SystemProperties.MessageId;
+ if (null == _checker)
+ {
+ Logger.Error($"No transaction checker registered, ignore it, messageId={messageId}, " +
+ $"transactionId={command.TransactionId}, endpoints={endpoints}, clientId={ClientId}");
+ return;
+ }
+
+ var message = MessageView.FromProtobuf(command.Message);
+ var transactionResolution = _checker.Check(message);
+ switch (transactionResolution)
+ {
+ case TransactionResolution.COMMIT:
+ case TransactionResolution.ROLLBACK:
+ await EndTransaction(endpoints, message.Topic, message.MessageId, command.TransactionId,
+ transactionResolution);
+ break;
+ case TransactionResolution.UNKNOWN:
+ default:
+ break;
+ }
+ }
+
+ public Transaction BeginTransaction()
+ {
+ return new Transaction(this);
+ }
+
+ internal async Task EndTransaction(Endpoints endpoints, string topic, string messageId, string transactionId,
+ TransactionResolution resolution)
+ {
+ var topicResource = new Proto.Resource
+ {
+ Name = topic
+ };
+ var request = new Proto.EndTransactionRequest
+ {
+ TransactionId = transactionId,
+ MessageId = messageId,
+ Topic = topicResource,
+ Resolution = TransactionResolution.COMMIT == resolution
+ ? Proto.TransactionResolution.Commit
+ : Proto.TransactionResolution.Rollback
+ };
+ var response = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout);
+ StatusChecker.Check(response.Status, request);
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs
index fa5c75c7..1e7c61bd 100644
--- a/csharp/rocketmq-client-csharp/SendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -23,19 +23,28 @@ namespace Org.Apache.Rocketmq
{
public sealed class SendReceipt
{
- public SendReceipt(string messageId)
+ public SendReceipt(string messageId, string transactionId, MessageQueue messageQueue)
{
MessageId = messageId;
+ TransactionId = transactionId;
+ MessageQueue = messageQueue;
}
public string MessageId { get; }
+ public string TransactionId { get; }
+
+ public MessageQueue MessageQueue { get; }
+
+ public Endpoints Endpoints => MessageQueue.Broker.Endpoints;
+
public override string ToString()
{
return $"{nameof(MessageId)}: {MessageId}";
}
- public static List<SendReceipt> ProcessSendMessageResponse(Proto.SendMessageResponse response)
+ public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq,
+ Proto.SendMessageResponse response)
{
var status = response.Status;
foreach (var entry in response.Entries)
@@ -48,7 +57,7 @@ namespace Org.Apache.Rocketmq
// May throw exception.
StatusChecker.Check(status, response);
- return response.Entries.Select(entry => new SendReceipt(entry.MessageId)).ToList();
+ return response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Transaction.cs b/csharp/rocketmq-client-csharp/Transaction.cs
new file mode 100644
index 00000000..e44c0675
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Transaction.cs
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Org.Apache.Rocketmq
+{
+ public class Transaction : ITransaction
+ {
+ private const int MaxMessageNum = 1;
+
+ private readonly Producer _producer;
+ private readonly HashSet<PublishingMessage> _messages;
+ private readonly ReaderWriterLockSlim _messagesLock;
+ private readonly ConcurrentDictionary<PublishingMessage, SendReceipt> _messageSendReceiptDict;
+
+ public Transaction(Producer producer)
+ {
+ _producer = producer;
+ _messages = new HashSet<PublishingMessage>();
+ _messagesLock = new ReaderWriterLockSlim();
+ _messageSendReceiptDict = new ConcurrentDictionary<PublishingMessage, SendReceipt>();
+ }
+
+ public PublishingMessage TryAddMessage(Message message)
+ {
+ _messagesLock.EnterReadLock();
+ try
+ {
+ if (_messages.Count > MaxMessageNum)
+ {
+ throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
+ }
+ }
+ finally
+ {
+ _messagesLock.ExitReadLock();
+ }
+
+ _messagesLock.EnterWriteLock();
+ try
+ {
+ if (_messages.Count > MaxMessageNum)
+ {
+ throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
+ }
+
+ var publishingMessage = new PublishingMessage(message, _producer.PublishingSettings, true);
+ _messages.Add(publishingMessage);
+ return publishingMessage;
+ }
+ finally
+ {
+ _messagesLock.ExitWriteLock();
+ }
+ }
+
+ public void TryAddReceipt(PublishingMessage publishingMessage, SendReceipt sendReceipt)
+ {
+ _messagesLock.EnterReadLock();
+ try
+ {
+ if (!_messages.Contains(publishingMessage))
+ {
+ throw new ArgumentException("Message is not in the transaction");
+ }
+
+ _messageSendReceiptDict[publishingMessage] = sendReceipt;
+ }
+ finally
+ {
+ _messagesLock.ExitReadLock();
+ }
+ }
+
+ public async void commit()
+ {
+ if (_messageSendReceiptDict.IsEmpty)
+ {
+ throw new ArgumentException("Transactional message has not been sent yet");
+ }
+
+ foreach (var (publishingMessage, sendReceipt) in _messageSendReceiptDict)
+ {
+ await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, sendReceipt.MessageId,
+ sendReceipt.TransactionId, TransactionResolution.COMMIT);
+ }
+ }
+
+ public async void rollback()
+ {
+ if (_messageSendReceiptDict.IsEmpty)
+ {
+ throw new ArgumentException("Transaction message has not been sent yet");
+ }
+
+ foreach (var (publishingMessage, sendReceipt) in _messageSendReceiptDict)
+ {
+ await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, sendReceipt.MessageId,
+ sendReceipt.TransactionId, TransactionResolution.ROLLBACK);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/rocketmq-client-csharp/TransactionResolution.cs
similarity index 71%
copy from csharp/tests/SendResultTest.cs
copy to csharp/rocketmq-client-csharp/TransactionResolution.cs
index fae7a7bb..5bb4d5e1 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/rocketmq-client-csharp/TransactionResolution.cs
@@ -15,19 +15,12 @@
* limitations under the License.
*/
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-
namespace Org.Apache.Rocketmq
{
- [TestClass]
- public class SendResultTest
+ public enum TransactionResolution
{
- [TestMethod]
- public void testCtor()
- {
- string messageId = new string("abc");
- var sendResult = new SendReceipt(messageId);
- Assert.AreEqual(messageId, sendResult.MessageId);
- }
+ COMMIT,
+ ROLLBACK,
+ UNKNOWN
}
}
\ No newline at end of file
diff --git a/csharp/tests/SendResultTest.cs b/csharp/tests/SendResultTest.cs
index fae7a7bb..262410da 100644
--- a/csharp/tests/SendResultTest.cs
+++ b/csharp/tests/SendResultTest.cs
@@ -25,9 +25,9 @@ namespace Org.Apache.Rocketmq
[TestMethod]
public void testCtor()
{
- string messageId = new string("abc");
- var sendResult = new SendReceipt(messageId);
- Assert.AreEqual(messageId, sendResult.MessageId);
+ // string messageId = new string("abc");
+ // var sendResult = new SendReceipt(messageId);
+ // Assert.AreEqual(messageId, sendResult.MessageId);
}
}
}
\ No newline at end of file