You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2020/09/22 16:10:06 UTC
[ignite] branch master updated: IGNITE-7369 .NET: Add Thin Client
Transactions API
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6633df5 IGNITE-7369 .NET: Add Thin Client Transactions API
6633df5 is described below
commit 6633df5d2fa98714dcc377f38f7b8e331928a3ec
Author: gurustron <gu...@gmail.com>
AuthorDate: Tue Sep 22 19:09:24 2020 +0300
IGNITE-7369 .NET: Add Thin Client Transactions API
Add explicit (ITransactionsClient.TxStart) and ambient (TransactionScope) transactions to the .NET Thin Client:
* IIgniteClient.GetTransactions
* IgniteClientConfiguration.TransactionConfiguration
---
.../Apache.Ignite.Core.Tests.csproj | 5 +
.../Client/Cache/CacheClientAbstractTxTest.cs | 926 +++++++++++++++++++++
.../Client/Cache/CacheClientLocalTxTest.cs | 39 +
.../Client/Cache/CacheClientOptimisticTxTest.cs | 159 ++++
.../CacheClientPartitionedTxDisconnectTest.cs | 86 ++
.../Client/Cache/CacheClientPartitionedTxTest.cs | 98 +++
.../Client/ClientTestBase.cs | 24 +-
.../Client/IgniteClientConfigurationTest.cs | 28 +
.../Config/Client/IgniteClientConfiguration.xml | 1 +
.../IgniteConfigurationTest.cs | 2 +
.../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 6 +
.../Apache.Ignite.Core/Client/IIgniteClient.cs | 9 +
.../Client/IgniteClientConfiguration.cs | 14 +-
.../Client/Transactions/ITransactionClient.cs | 60 ++
.../Client/Transactions/ITransactionsClient.cs | 140 ++++
.../Transactions/TransactionClientConfiguration.cs | 82 ++
.../IgniteClientConfigurationSection.xsd | 37 +
.../Impl/Client/Cache/CacheClient.cs | 58 +-
.../Impl/Client/ClientFailoverSocket.cs | 26 +-
.../Impl/Client/ClientFeatures.cs | 4 +-
.../Apache.Ignite.Core/Impl/Client/ClientOp.cs | 4 +
.../Apache.Ignite.Core/Impl/Client/IgniteClient.cs | 22 +-
.../Transactions/ClientCacheTransactionManager.cs} | 124 ++-
.../Impl/Client/Transactions/TransactionClient.cs | 182 ++++
.../Impl/Client/Transactions/TransactionsClient.cs | 257 ++++++
.../Impl/Transactions/CacheTransactionManager.cs | 2 +-
.../Transactions/ITransaction.cs | 12 +-
27 files changed, 2311 insertions(+), 96 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 279fb14..0a4fe0f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -147,7 +147,12 @@
<Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" />
<Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" />
<Compile Include="Client\Cache\ContinuousQueryTest.cs" />
+ <Compile Include="Client\Cache\CacheClientAbstractTxTest.cs" />
+ <Compile Include="Client\Cache\CacheClientLocalTxTest.cs" />
+ <Compile Include="Client\Cache\CacheClientPartitionedTxDisconnectTest.cs" />
+ <Compile Include="Client\Cache\CacheClientPartitionedTxTest.cs" />
<Compile Include="Client\Cache\DynamicFieldSetTest.cs" />
+ <Compile Include="Client\Cache\CacheClientOptimisticTxTest.cs" />
<Compile Include="Client\Cache\SerializableObjectsTest.cs" />
<Compile Include="Client\Cache\PartitionAwarenessTest.cs" />
<Compile Include="Client\Cache\BinaryBuilderTest.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientAbstractTxTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientAbstractTxTest.cs
new file mode 100644
index 0000000..6f71021
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientAbstractTxTest.cs
@@ -0,0 +1,926 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using System.Transactions;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.Core.Client.Transactions;
+ using Apache.Ignite.Core.Impl.Client.Transactions;
+ using Apache.Ignite.Core.Transactions;
+ using NUnit.Framework;
+ using NUnit.Framework.Constraints;
+
+ /// <summary>
+ /// Transactional cache client tests.
+ /// </summary>
+ public abstract class CacheClientAbstractTxTest : ClientTestBase
+ {
+ /** All concurrency controls. */
+ private static readonly TransactionConcurrency[] AllConcurrencyControls =
+ {
+ TransactionConcurrency.Optimistic,
+ TransactionConcurrency.Pessimistic
+ };
+
+ /** All isolation levels*/
+ private static readonly TransactionIsolation[] AllIsolationLevels =
+ {
+ TransactionIsolation.Serializable,
+ TransactionIsolation.ReadCommitted,
+ TransactionIsolation.RepeatableRead
+ };
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="CacheClientAbstractTxTest"/> class.
+ /// </summary>
+ protected CacheClientAbstractTxTest(int serverCount, bool enablePartitionAwareness) : base(serverCount,
+ enablePartitionAwareness: enablePartitionAwareness)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Tests that custom client transactions configuration is applied.
+ /// </summary>
+ [Test]
+ public void TestClientTransactionConfiguration()
+ {
+ var timeout = TransactionClientConfiguration.DefaultDefaultTimeout.Add(TimeSpan.FromMilliseconds(1000));
+ var cfg = GetClientConfiguration();
+ cfg.TransactionConfiguration = new TransactionClientConfiguration
+ {
+ DefaultTimeout = timeout
+ };
+
+ foreach (var concurrency in AllConcurrencyControls)
+ {
+ foreach (var isolation in AllIsolationLevels)
+ {
+ cfg.TransactionConfiguration.DefaultTransactionConcurrency = concurrency;
+ cfg.TransactionConfiguration.DefaultTransactionIsolation = isolation;
+ using (var client = Ignition.StartClient(cfg))
+ {
+ var transactions = client.GetTransactions();
+ Assert.AreEqual(concurrency, transactions.DefaultTransactionConcurrency);
+ Assert.AreEqual(isolation, transactions.DefaultTransactionIsolation);
+ Assert.AreEqual(timeout, transactions.DefaultTimeout);
+
+ ITransaction igniteTx;
+ using (var tx = transactions.TxStart())
+ {
+ Assert.AreEqual(tx, transactions.Tx);
+ Assert.AreEqual(concurrency, tx.Concurrency);
+ Assert.AreEqual(isolation, tx.Isolation);
+ Assert.AreEqual(timeout, tx.Timeout);
+
+ igniteTx = GetSingleLocalTransaction();
+ Assert.AreEqual(concurrency, igniteTx.Concurrency);
+ Assert.AreEqual(isolation, igniteTx.Isolation);
+ Assert.AreEqual(timeout, igniteTx.Timeout);
+ }
+
+ igniteTx.Dispose();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Tests that parameters passed to TxStart are applied.
+ /// </summary>
+ [Test]
+ public void TestTxStartPassesParameters()
+ {
+ var timeout = TransactionClientConfiguration.DefaultDefaultTimeout.Add(TimeSpan.FromMilliseconds(1000));
+ var acts = new List<Func<ITransactionsClient>>
+ {
+ () => Client.GetTransactions(),
+ () => Client.GetTransactions().WithLabel("label"),
+ };
+ foreach (var concurrency in AllConcurrencyControls)
+ {
+ foreach (var isolation in AllIsolationLevels)
+ {
+ foreach (var act in acts)
+ {
+ var client = act();
+
+ ITransaction igniteTx;
+ using (var tx = client.TxStart(concurrency, isolation))
+ {
+ Assert.AreEqual(tx, client.Tx);
+ Assert.AreEqual(concurrency, tx.Concurrency);
+ Assert.AreEqual(isolation, tx.Isolation);
+
+ igniteTx = GetSingleLocalTransaction();
+ Assert.AreEqual(concurrency, igniteTx.Concurrency);
+ Assert.AreEqual(isolation, igniteTx.Isolation);
+ }
+
+ igniteTx.Dispose();
+ using (var tx = client.TxStart(concurrency, isolation, timeout))
+ {
+ Assert.AreEqual(concurrency, tx.Concurrency);
+ Assert.AreEqual(isolation, tx.Isolation);
+ Assert.AreEqual(timeout, tx.Timeout);
+
+ igniteTx = GetSingleLocalTransaction();
+ Assert.AreEqual(concurrency, igniteTx.Concurrency);
+ Assert.AreEqual(isolation, igniteTx.Isolation);
+ Assert.AreEqual(timeout, igniteTx.Timeout);
+ }
+
+ igniteTx.Dispose();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Tests that transaction can't be committed/rollback after being already completed.
+ /// </summary>
+ [Test]
+ public void TestThrowsIfEndAlreadyCompletedTransaction()
+ {
+ var tx = Client.GetTransactions().TxStart();
+ tx.Commit();
+
+ var constraint = new ReusableConstraint(Is.TypeOf<InvalidOperationException>()
+ .And.Message.Contains("Transaction")
+ .And.Message.Contains("is closed"));
+
+ Assert.Throws(constraint, () => tx.Commit());
+ Assert.Throws(constraint, () => tx.Rollback());
+
+ using (tx = Client.GetTransactions().TxStart())
+ {
+ }
+
+ Assert.Throws(constraint, () => tx.Commit());
+ Assert.Throws(constraint, () => tx.Rollback());
+ }
+
+ /// <summary>
+ /// Tests that transaction throws if timeout elapsed.
+ /// </summary>
+ [Test]
+ public void TestTimeout()
+ {
+ var timeout = TimeSpan.FromMilliseconds(200);
+ var cache = GetTransactionalCache();
+ cache.Put(1, 1);
+ using (var tx = Client.GetTransactions().TxStart(TransactionConcurrency.Pessimistic,
+ TransactionIsolation.ReadCommitted,
+ timeout))
+ {
+ Thread.Sleep(TimeSpan.FromMilliseconds(300));
+ var constraint = new ReusableConstraint(Is.TypeOf<IgniteClientException>()
+ .And.Message.Contains("Cache transaction timed out"));
+ Assert.Throws(constraint, () => cache.Put(1, 10));
+ Assert.Throws(constraint, () => tx.Commit());
+ }
+
+ Assert.AreEqual(1, cache.Get(1));
+ }
+
+ /// <summary>
+ /// Tests that commit applies cache changes.
+ /// </summary>
+ [Test]
+ public void TestTxCommit()
+ {
+ var cache = GetTransactionalCache();
+
+ cache.Put(1, 1);
+ cache.Put(2, 2);
+
+ using (var tx = Client.GetTransactions().TxStart())
+ {
+ cache.Put(1, 10);
+ cache.Put(2, 20);
+
+ tx.Commit();
+ }
+
+ Assert.AreEqual(10, cache.Get(1));
+ Assert.AreEqual(20, cache.Get(2));
+ }
+
+ /// <summary>
+ /// Tests that rollback reverts cache changes.
+ /// </summary>
+ [Test]
+ public void TestTxRollback()
+ {
+ var cache = GetTransactionalCache();
+ cache.Put(1, 1);
+ cache.Put(2, 2);
+
+ using (var tx = Client.GetTransactions().TxStart())
+ {
+ cache.Put(1, 10);
+ cache.Put(2, 20);
+
+ Assert.AreEqual(10, cache.Get(1));
+ Assert.AreEqual(20, cache.Get(2));
+ tx.Rollback();
+ }
+
+ Assert.AreEqual(1, cache.Get(1));
+ Assert.AreEqual(2, cache.Get(2));
+ }
+
+ /// <summary>
+ /// Tests that closing transaction without commit reverts cache changes.
+ /// </summary>
+ [Test]
+ public void TestTxClose()
+ {
+ var cache = GetTransactionalCache();
+
+ cache.Put(1, 1);
+ cache.Put(2, 2);
+
+ using (Client.GetTransactions().TxStart())
+ {
+ cache.Put(1, 10);
+ cache.Put(2, 20);
+ }
+
+ Assert.AreEqual(1, cache.Get(1));
+ Assert.AreEqual(2, cache.Get(2));
+ }
+
+ /// <summary>
+ /// Tests that client can't start multiple transactions in one thread.
+ /// </summary>
+ [Test]
+ public void TestThrowsIfMultipleStarted()
+ {
+ TestThrowsIfMultipleStarted(
+ () => Client.GetTransactions().TxStart(),
+ () => Client.GetTransactions().TxStart());
+ }
+
+ /// <summary>
+ /// Tests that different clients can start transactions in one thread.
+ /// </summary>
+ [Test]
+ public void TestDifferentClientsCanStartTransactions()
+ {
+ var client = Client;
+ var cache = GetTransactionalCache(client);
+ cache[1] = 1;
+ cache[2] = 2;
+
+ var anotherClient = GetClient();
+ var anotherCache = GetTransactionalCache(anotherClient);
+ var concurrency = TransactionConcurrency.Optimistic;
+ var isolation = TransactionIsolation.ReadCommitted;
+ using (var tx = client.GetTransactions().TxStart(concurrency, isolation))
+ {
+ cache[1] = 10;
+ using (var anotherTx = anotherClient.GetTransactions().TxStart(concurrency, isolation))
+ {
+ Assert.AreNotSame(tx, anotherTx);
+ Assert.AreSame(tx, client.GetTransactions().Tx);
+ Assert.AreSame(anotherTx, anotherClient.GetTransactions().Tx);
+
+ Assert.AreEqual(10, cache[1]);
+ Assert.AreEqual(1, anotherCache[1]);
+
+ anotherCache[2] = 20;
+
+ Assert.AreEqual(2, cache[2]);
+ Assert.AreEqual(20, anotherCache[2]);
+
+ anotherTx.Commit();
+
+ Assert.AreEqual(20, cache[2]);
+ }
+ }
+
+ Assert.AreEqual(1, cache[1]);
+ Assert.AreEqual(20, cache[2]);
+ Assert.AreEqual(1, anotherCache[1]);
+ Assert.AreEqual(20, anotherCache[2]);
+ }
+
+ /// <summary>
+ /// Test Ignite thin client transaction with label.
+ /// </summary>
+ [Test]
+ public void TestWithLabel()
+ {
+ const string label1 = "label1";
+ const string label2 = "label2";
+
+ var cache = GetTransactionalCache();
+ cache.Put(1, 1);
+ cache.Put(2, 2);
+
+ ITransaction igniteTx;
+ using (Client.GetTransactions().WithLabel(label1).TxStart())
+ {
+ igniteTx = GetSingleLocalTransaction();
+
+ Assert.AreEqual(igniteTx.Label, label1);
+
+ cache.Put(1, 10);
+ cache.Put(2, 20);
+ }
+
+ igniteTx.Dispose();
+
+ Assert.AreEqual(1, cache.Get(1));
+ Assert.AreEqual(2, cache.Get(2));
+
+ using (var tx = Client.GetTransactions().WithLabel(label1).TxStart())
+ {
+ igniteTx = GetSingleLocalTransaction();
+
+ Assert.AreEqual(label1, igniteTx.Label);
+ Assert.AreEqual(label1, tx.Label);
+
+ cache.Put(1, 10);
+ cache.Put(2, 20);
+ tx.Commit();
+ }
+
+ igniteTx.Dispose();
+
+ Assert.AreEqual(10, cache.Get(1));
+ Assert.AreEqual(20, cache.Get(2));
+
+ using (var tx = Client.GetTransactions().WithLabel(label1).WithLabel(label2).TxStart())
+ {
+ igniteTx = GetSingleLocalTransaction();
+
+ Assert.AreEqual(label2, igniteTx.Label);
+ Assert.AreEqual(label2, tx.Label);
+ }
+
+ igniteTx.Dispose();
+
+ TestThrowsIfMultipleStarted(
+ () => Client.GetTransactions().WithLabel(label1).TxStart(),
+ () => Client.GetTransactions().TxStart());
+
+ TestThrowsIfMultipleStarted(
+ () => Client.GetTransactions().TxStart(),
+ () => Client.GetTransactions().WithLabel(label1).TxStart());
+
+ TestThrowsIfMultipleStarted(
+ () => Client.GetTransactions().WithLabel(label1).TxStart(),
+ () => Client.GetTransactions().WithLabel(label2).TxStart());
+ }
+
+ /// <summary>
+ /// Tests that unfinished transaction does not prevent <see cref="IIgniteClient"/>>
+ /// from being garbage collected.
+ /// </summary>
+ [Test]
+ public void TestFinalizesAfterClientIsDisposed()
+ {
+ ConcurrentBag<WeakReference> weakRef = new ConcurrentBag<WeakReference>();
+ Action<Action<IIgniteClient>> act = startTx =>
+ {
+ var client = GetClient();
+ weakRef.Add(new WeakReference(client));
+ var cache = GetTransactionalCache(client);
+ startTx(client);
+ cache[42] = 42;
+
+ client.Dispose();
+ };
+
+ Action<IIgniteClient>[] txStarts =
+ {
+ // ReSharper disable once ObjectCreationAsStatement
+ client => new TransactionScope(),
+ client => client.GetTransactions().TxStart()
+ };
+
+ foreach (var txStart in txStarts)
+ {
+ var tasks = Enumerable.Range(0, 3)
+ .Select(i => Task.Factory.StartNew(() => act(txStart),TaskCreationOptions.LongRunning))
+ .ToArray();
+
+ Task.WaitAll(tasks);
+
+ GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced);
+ GC.WaitForPendingFinalizers();
+
+ Assert.IsFalse(weakRef.All(wr => wr.IsAlive));
+ }
+ }
+
+ /// <summary>
+ /// Test that GC does not close <see cref="TransactionScope"/>'s underlying transaction.
+ /// </summary>
+ [Test]
+ public void TestGcDoesNotCloseAmbientTx()
+ {
+ WeakReference weakRef = null;
+ Func<TransactionScope> act = () =>
+ {
+ var innerScope = new TransactionScope();
+ GetTransactionalCache()[42] = 42;
+ weakRef = new WeakReference(Client.GetTransactions().Tx);
+ return innerScope;
+ };
+
+ using (act())
+ {
+ GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced);
+ GC.WaitForPendingFinalizers();
+ Assert.IsTrue(weakRef.IsAlive);
+ var tx = (ITransactionClient) weakRef.Target;
+ Assert.IsNotNull(Client.GetTransactions().Tx);
+ Assert.AreSame(tx, Client.GetTransactions().Tx);
+ }
+ }
+
+ /// <summary>
+ /// Test Ignite thin client transaction enlistment in ambient <see cref="TransactionScope"/>.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeSingleCache()
+ {
+ var cache = GetTransactionalCache();
+
+ cache[1] = 1;
+ cache[2] = 2;
+
+ // Commit.
+ using (var ts = new TransactionScope())
+ {
+ cache[1] = 10;
+ cache[2] = 20;
+
+ ts.Complete();
+ }
+
+ Assert.AreEqual(10, cache[1]);
+ Assert.AreEqual(20, cache[2]);
+
+ // Rollback.
+ using (new TransactionScope())
+ {
+ cache[1] = 100;
+ cache[2] = 200;
+ }
+
+ Assert.AreEqual(10, cache[1]);
+ Assert.AreEqual(20, cache[2]);
+ }
+
+ /// <summary>
+ /// Test Ignite thin client transaction enlistment in ambient <see cref="TransactionScope"/>
+ /// with multiple participating caches.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeMultiCache()
+ {
+ var cache1 = GetTransactionalCache();
+ var cache2 = GetTransactionalCache(cache1.Name + "1");
+
+ cache1[1] = 1;
+ cache2[1] = 2;
+
+ // Commit.
+ using (var ts = new TransactionScope())
+ {
+ cache1.Put(1, 10);
+ cache2.Put(1, 20);
+
+ ts.Complete();
+ }
+
+ Assert.AreEqual(10, cache1[1]);
+ Assert.AreEqual(20, cache2[1]);
+
+ // Rollback.
+ using (new TransactionScope())
+ {
+ cache1.Put(1, 100);
+ cache2.Put(1, 200);
+ }
+
+ Assert.AreEqual(10, cache1[1]);
+ Assert.AreEqual(20, cache2[1]);
+ }
+
+ /// <summary>
+ /// Test Ignite thin client transaction enlistment in ambient <see cref="TransactionScope"/>
+ /// when Ignite tx is started manually.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeWithManualIgniteTx()
+ {
+ var cache = GetTransactionalCache();
+ var transactions = Client.GetTransactions();
+
+ cache[1] = 1;
+
+ // When Ignite tx is started manually, it won't be enlisted in TransactionScope.
+ using (var tx = transactions.TxStart())
+ {
+ using (new TransactionScope())
+ {
+ cache[1] = 2;
+ } // Revert transaction scope.
+
+ tx.Commit(); // Commit manual tx.
+ }
+
+ Assert.AreEqual(2, cache[1]);
+ }
+
+ /// <summary>
+ /// Test Ignite transaction with <see cref="TransactionScopeOption.Suppress"/> option.
+ /// </summary>
+ [Test]
+ public void TestSuppressedTransactionScope()
+ {
+ var cache = GetTransactionalCache();
+
+ cache[1] = 1;
+
+ using (new TransactionScope(TransactionScopeOption.Suppress))
+ {
+ cache[1] = 2;
+ }
+
+ // Even though transaction is not completed, the value is updated, because tx is suppressed.
+ Assert.AreEqual(2, cache[1]);
+ }
+
+ /// <summary>
+ /// Test Ignite thin client transaction enlistment in ambient <see cref="TransactionScope"/> with nested scopes.
+ /// </summary>
+ [Test]
+ public void TestNestedTransactionScope()
+ {
+ var cache = GetTransactionalCache();
+
+ cache[1] = 1;
+
+ foreach (var option in new[] {TransactionScopeOption.Required, TransactionScopeOption.RequiresNew})
+ {
+ // Commit.
+ using (var ts1 = new TransactionScope())
+ {
+ using (var ts2 = new TransactionScope(option))
+ {
+ cache[1] = 2;
+ ts2.Complete();
+ }
+
+ cache[1] = 3;
+ ts1.Complete();
+ }
+
+ Assert.AreEqual(3, cache[1]);
+
+ // Rollback.
+ using (new TransactionScope())
+ {
+ using (new TransactionScope(option))
+ cache[1] = 4;
+
+ cache[1] = 5;
+ }
+
+ // In case with Required option there is a single tx
+ // that gets aborted, second put executes outside the tx.
+ Assert.AreEqual(option == TransactionScopeOption.Required ? 5 : 3, cache[1], option.ToString());
+ }
+ }
+
+ /// <summary>
+ /// Test that ambient <see cref="TransactionScope"/> options propagate to Ignite transaction.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeOptions()
+ {
+ var cache = GetTransactionalCache();
+ var transactions = (TransactionsClient) Client.GetTransactions();
+
+ var modes = new[]
+ {
+ Tuple.Create(IsolationLevel.Serializable, TransactionIsolation.Serializable),
+ Tuple.Create(IsolationLevel.RepeatableRead, TransactionIsolation.RepeatableRead),
+ Tuple.Create(IsolationLevel.ReadCommitted, TransactionIsolation.ReadCommitted),
+ Tuple.Create(IsolationLevel.ReadUncommitted, TransactionIsolation.ReadCommitted),
+ Tuple.Create(IsolationLevel.Snapshot, TransactionIsolation.ReadCommitted),
+ Tuple.Create(IsolationLevel.Chaos, TransactionIsolation.ReadCommitted),
+ };
+
+ foreach (var mode in modes)
+ {
+ ITransaction tx;
+ using (new TransactionScope(TransactionScopeOption.Required, new TransactionOptions
+ {
+ IsolationLevel = mode.Item1
+ }))
+ {
+ cache[1] = 1;
+
+ tx = GetSingleLocalTransaction();
+ Assert.AreEqual(mode.Item2, tx.Isolation);
+ Assert.AreEqual(transactions.DefaultTransactionConcurrency, tx.Concurrency);
+ }
+
+ tx.Dispose();
+ }
+ }
+
+ /// <summary>
+ /// Tests all synchronous transactional operations with <see cref="TransactionScope"/>.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeAllOperationsSync()
+ {
+ CheckTxOp((cache, key) => cache.Put(key, -5));
+
+ CheckTxOp((cache, key) => cache.PutAll(new Dictionary<int, int> {{key, -7}}));
+
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.PutIfAbsent(key, -10);
+ });
+
+ CheckTxOp((cache, key) => cache.GetAndPut(key, -9));
+
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.GetAndPutIfAbsent(key, -10);
+ });
+
+ CheckTxOp((cache, key) => cache.GetAndRemove(key));
+
+ CheckTxOp((cache, key) => cache.GetAndReplace(key, -11));
+
+ CheckTxOp((cache, key) => cache.Remove(key));
+
+ CheckTxOp((cache, key) => cache.RemoveAll(new[] {key}));
+
+ CheckTxOp((cache, key) => cache.Replace(key, 100));
+
+ CheckTxOp((cache, key) => cache.Replace(key, cache[key], 100));
+ }
+
+ /// <summary>
+ /// Tests all transactional async operations with <see cref="TransactionScope"/>.
+ /// </summary>
+ [Test]
+ [Ignore("Async thin client transactional operations not supported.")]
+ public void TestTransactionScopeAllOperationsAsync()
+ {
+ CheckTxOp((cache, key) => cache.PutAsync(key, -5));
+ CheckTxOp((cache, key) => cache.PutAllAsync(new Dictionary<int, int> {{key, -7}}));
+
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.PutIfAbsentAsync(key, -10);
+ });
+
+ CheckTxOp((cache, key) => cache.GetAndPutAsync(key, -9));
+
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.GetAndPutIfAbsentAsync(key, -10);
+ });
+
+ CheckTxOp((cache, key) => cache.GetAndRemoveAsync(key));
+
+ CheckTxOp((cache, key) => cache.GetAndReplaceAsync(key, -11));
+
+ CheckTxOp((cache, key) => cache.RemoveAsync(key));
+
+ CheckTxOp((cache, key) => cache.RemoveAllAsync(new[] {key}));
+
+ CheckTxOp((cache, key) => cache.ReplaceAsync(key, 100));
+
+ CheckTxOp((cache, key) => cache.ReplaceAsync(key, cache[key], 100));
+ }
+
+ /// <summary>
+ /// Tests that read operations lock keys in Serializable mode.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeWithSerializableIsolationLocksKeysOnRead()
+ {
+ Action<Func<ICacheClient<int, int>, int, int>>
+ test = TestTransactionScopeWithSerializableIsolationLocksKeysOnRead;
+
+ test((cache, key) => cache[key]);
+ test((cache, key) => cache.Get(key));
+ test((cache, key) =>
+ {
+ int val;
+ return cache.TryGet(key, out val) ? val : 0;
+ });
+ test((cache, key) => cache.GetAll(new[] {key}).Single().Value);
+ }
+
+ /// <summary>
+ /// Tests that async read operations lock keys in Serializable mode.
+ /// </summary>
+ [Test]
+ [Ignore("Async thin client transactional operations not supported.")]
+ public void TestTransactionScopeWithSerializableIsolationLocksKeysOnReadAsync()
+ {
+ Action<Func<ICacheClient<int, int>, int, int>>
+ test = TestTransactionScopeWithSerializableIsolationLocksKeysOnRead;
+
+ test((cache, key) => cache.GetAsync(key).Result);
+ test((cache, key) => cache.TryGetAsync(key).Result.Value);
+ test((cache, key) => cache.GetAll(new[] {key}).Single().Value);
+ }
+
+ /// <summary>
+ /// Tests that read operations lock keys in Serializable mode.
+ /// </summary>
+ private void TestTransactionScopeWithSerializableIsolationLocksKeysOnRead(
+ Func<ICacheClient<int, int>, int, int> readOp)
+ {
+ var cache = GetTransactionalCache();
+ cache.Put(1, 1);
+
+ var options = new TransactionOptions {IsolationLevel = IsolationLevel.Serializable};
+
+ using (var scope = new TransactionScope(TransactionScopeOption.Required, options))
+ {
+ Assert.AreEqual(1, readOp(cache, 1));
+ Assert.IsNotNull(Client.GetTransactions().Tx);
+
+ var evt = new ManualResetEventSlim();
+
+ var task = Task.Factory.StartNew(() =>
+ {
+ cache.PutAsync(1, 2);
+ evt.Set();
+ });
+
+ evt.Wait();
+
+ Assert.AreEqual(1, readOp(cache, 1));
+
+ scope.Complete();
+ task.Wait();
+ }
+
+ TestUtils.WaitForTrueCondition(() => 2 == readOp(cache, 1));
+ }
+
+ /// <summary>
+ /// Checks that cache operation behaves transactionally.
+ /// </summary>
+ private void CheckTxOp(Action<ICacheClient<int, int>, int> act)
+ {
+ var isolationLevels = new[]
+ {
+ IsolationLevel.Serializable, IsolationLevel.RepeatableRead, IsolationLevel.ReadCommitted,
+ IsolationLevel.ReadUncommitted, IsolationLevel.Snapshot, IsolationLevel.Chaos
+ };
+
+ foreach (var isolationLevel in isolationLevels)
+ {
+ var txOpts = new TransactionOptions {IsolationLevel = isolationLevel};
+ const TransactionScopeOption scope = TransactionScopeOption.Required;
+
+ var cache = GetTransactionalCache();
+
+ cache[1] = 1;
+ cache[2] = 2;
+
+ // Rollback.
+ using (new TransactionScope(scope, txOpts))
+ {
+ act(cache, 1);
+
+ Assert.IsNotNull(Client.GetTransactions().Tx, "Transaction has not started.");
+ }
+
+ Assert.AreEqual(1, cache[1]);
+ Assert.AreEqual(2, cache[2]);
+
+ using (new TransactionScope(scope, txOpts))
+ {
+ act(cache, 1);
+ act(cache, 2);
+ }
+
+ Assert.AreEqual(1, cache[1]);
+ Assert.AreEqual(2, cache[2]);
+
+ // Commit.
+ using (var ts = new TransactionScope(scope, txOpts))
+ {
+ act(cache, 1);
+ ts.Complete();
+ }
+
+ Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
+ Assert.AreEqual(2, cache[2]);
+
+ using (var ts = new TransactionScope(scope, txOpts))
+ {
+ act(cache, 1);
+ act(cache, 2);
+ ts.Complete();
+ }
+
+ Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
+ Assert.IsTrue(!cache.ContainsKey(2) || cache[2] != 2);
+ }
+ }
+
+ /// <summary>
+ /// Gets single transaction from Ignite.
+ /// </summary>
+ private static ITransaction GetSingleLocalTransaction()
+ {
+ return GetIgnite()
+ .GetTransactions()
+ .GetLocalActiveTransactions()
+ .Single();
+ }
+
+ /// <summary>
+ /// Tests that client can't start multiple transactions in one thread.
+ /// </summary>
+ private void TestThrowsIfMultipleStarted(Func<IDisposable> outer, Func<IDisposable> inner)
+ {
+ Assert.Throws(
+ Is.TypeOf<IgniteClientException>()
+ .And.Message.Contains("A transaction has already been started by the current thread."),
+ () =>
+ {
+ using (outer())
+ using (inner())
+ {
+ // No-op.
+ }
+ });
+ }
+
+ /// <summary>
+ /// Gets cache name.
+ /// </summary>
+ protected virtual string GetCacheName()
+ {
+ return "client_transactional";
+ }
+
+ /// <summary>
+ /// Gets or creates transactional cache
+ /// </summary>
+ protected ICacheClient<int, int> GetTransactionalCache(string cacheName = null)
+ {
+ return GetTransactionalCache(Client, cacheName);
+ }
+
+ /// <summary>
+ /// Gets or creates transactional cache
+ /// </summary>
+ private ICacheClient<int, int> GetTransactionalCache(IIgniteClient client, string cacheName = null)
+ {
+ return client.GetOrCreateCache<int, int>(new CacheClientConfiguration
+ {
+ Name = cacheName ?? GetCacheName(),
+ AtomicityMode = CacheAtomicityMode.Transactional
+ });
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientLocalTxTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientLocalTxTest.cs
new file mode 100644
index 0000000..2a6eb8c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientLocalTxTest.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ /// <summary>
+ /// Tests client transactions for single node.
+ /// </summary>
+ public class CacheClientLocalTxTest : CacheClientAbstractTxTest
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="CacheClientLocalTxTest"/> class.
+ /// </summary>
+ public CacheClientLocalTxTest() : base(1, false)
+ {
+ // No-op.
+ }
+
+ /** <inhertiDoc /> */
+ protected override string GetCacheName()
+ {
+ return "local_" + base.GetCacheName();
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientOptimisticTxTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientOptimisticTxTest.cs
new file mode 100644
index 0000000..50de87a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientOptimisticTxTest.cs
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System.Linq;
+ using System.Threading.Tasks;
+ using System.Transactions;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.Core.Client.Transactions;
+ using Apache.Ignite.Core.Transactions;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests for <see cref="TransactionConcurrency.Optimistic"/> mode.
+ /// </summary>
+ public class CacheClientOptimisticTxTest : ClientTestBase
+ {
+ /// <summary>
+ /// Tests client explicit optimistic transactions.
+ /// </summary>
+ [Test]
+ public void TestExplicitOptimisticTransactionThrowsOptimisticExceptionOnConflict()
+ {
+ var cache = GetTransactionalCache();
+ cache[1] = 1;
+ var transactions = Client.GetTransactions();
+
+ ITransaction igniteTx;
+ using (var tx = transactions.TxStart())
+ {
+ igniteTx = GetSingleLocalTransaction();
+ Assert.IsNotNull(igniteTx);
+ Assert.IsNotNull(transactions.Tx);
+ Assert.AreEqual(TransactionConcurrency.Optimistic, igniteTx.Concurrency);
+ Assert.AreEqual(TransactionIsolation.Serializable, igniteTx.Isolation);
+
+ var old = cache[1];
+
+ Task.Factory.StartNew(() =>
+ {
+ Assert.IsNull(transactions.Tx);
+ cache[1] = -1;
+ },
+ TaskCreationOptions.LongRunning)
+ .Wait();
+
+ Assert.AreEqual(old, cache[1]);
+ cache[1] = old + 1;
+
+ var constraint = Is.TypeOf<IgniteClientException>()
+ .And.Message
+ .StartsWith(
+ "Failed to prepare transaction, read/write conflict [key=1, keyCls=java.lang.Integer, val=-1");
+ Assert.Throws(constraint, () => tx.Commit());
+ }
+
+ Assert.AreEqual(-1, cache[1]);
+
+ igniteTx.Dispose();
+ }
+
+ /// <summary>
+ /// Tests client ambient optimistic transactions (with <see cref="TransactionScope"/>).
+ /// </summary>
+ [Test]
+ public void TestAmbientOptimisticTransactionThrowsOptimisticExceptionOnConflict()
+ {
+ var cache = GetTransactionalCache();
+ cache[1] = 1;
+ var transactions = Client.GetTransactions();
+
+ var scope = new TransactionScope();
+ var old = cache[1];
+ Assert.IsNotNull(transactions.Tx);
+
+ var igniteTx = GetSingleLocalTransaction();
+ Assert.AreEqual(TransactionConcurrency.Optimistic, igniteTx.Concurrency);
+ Assert.AreEqual(TransactionIsolation.Serializable, igniteTx.Isolation);
+
+ Task.Factory.StartNew(() =>
+ {
+ Assert.IsNull(transactions.Tx);
+ cache[1] = -1;
+ },
+ TaskCreationOptions.LongRunning)
+ .Wait();
+
+ Assert.AreEqual(old, cache[1]);
+ cache[1] = old + 1;
+
+ // Complete() just sets a flag, actual Commit is called from Dispose().
+ scope.Complete();
+
+ var constraint = Is.TypeOf<IgniteClientException>()
+ .And.Message
+ .StartsWith(
+ "Failed to prepare transaction, read/write conflict [key=1, keyCls=java.lang.Integer, val=-1");
+ Assert.Throws(constraint, () => scope.Dispose());
+
+ Assert.AreEqual(-1, cache[1]);
+ Assert.IsNull(transactions.Tx);
+
+ igniteTx.Dispose();
+ }
+
+ /** <inheritdoc /> */
+ protected override IgniteClientConfiguration GetClientConfiguration()
+ {
+ return new IgniteClientConfiguration(base.GetClientConfiguration())
+ {
+ TransactionConfiguration = new TransactionClientConfiguration
+ {
+ DefaultTransactionConcurrency = TransactionConcurrency.Optimistic,
+ DefaultTransactionIsolation = TransactionIsolation.Serializable
+ }
+ };
+ }
+
+ /// <summary>
+ /// Gets or creates transactional cache
+ /// </summary>
+ private ICacheClient<int, int> GetTransactionalCache()
+ {
+ return Client.GetOrCreateCache<int, int>(new CacheClientConfiguration
+ {
+ Name = TestUtils.TestName,
+ AtomicityMode = CacheAtomicityMode.Transactional
+ });
+ }
+
+ /// <summary>
+ /// Gets single transaction from Ignite.
+ /// </summary>
+ private static ITransaction GetSingleLocalTransaction()
+ {
+ return GetIgnite()
+ .GetTransactions()
+ .GetLocalActiveTransactions()
+ .Single();
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientPartitionedTxDisconnectTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientPartitionedTxDisconnectTest.cs
new file mode 100644
index 0000000..f2ef219
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientPartitionedTxDisconnectTest.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System.Linq;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Cache;
+ using NUnit.Framework;
+ using NUnit.Framework.Constraints;
+
+ /// <summary>
+ /// Tests client transactions for multiple nodes with partition awareness.
+ /// </summary>
+ public class CacheClientPartitionedTxDisconnectTest : ClientTestBase
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="CacheClientPartitionedTxDisconnectTest"/> class.
+ /// </summary>
+ public CacheClientPartitionedTxDisconnectTest() : base(3, enablePartitionAwareness: true)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Tests that transaction handles reconnect.
+ /// </summary>
+ [Test]
+ public void TestDisconnect()
+ {
+ var cache = GetTransactionalCache();
+
+ var constraint = new ReusableConstraint(Is.TypeOf<IgniteClientException>()
+ .And.Message.EqualTo("Transaction context has been lost due to connection errors."));
+ try
+ {
+ using (Client.GetTransactions().TxStart())
+ {
+ var igniteToStop = new[] {(int?) null, 1, 2}
+ .Select(i => GetIgnite(i))
+ .FirstOrDefault(ign => ign.GetTransactions().GetLocalActiveTransactions().Any());
+
+ Assert.IsNotNull(igniteToStop);
+ Ignition.Stop(igniteToStop.Name, true);
+
+ Assert.Catch(() => cache.Put(1, 1));
+ Assert.Throws(constraint, () => cache.Put(1, 1));
+ }
+ }
+ catch (IgniteClientException ex)
+ {
+ Assert.That(ex, constraint);
+ }
+
+ Assert.DoesNotThrow(() => cache.Put(1, 1));
+ Assert.IsNull(Client.GetTransactions().Tx);
+ }
+
+ /// <summary>
+ /// Gets or creates transactional cache
+ /// </summary>
+ private ICacheClient<int, int> GetTransactionalCache()
+ {
+ return Client.GetOrCreateCache<int, int>(new CacheClientConfiguration
+ {
+ Name = TestUtils.TestName,
+ AtomicityMode = CacheAtomicityMode.Transactional
+ });
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientPartitionedTxTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientPartitionedTxTest.cs
new file mode 100644
index 0000000..40f4254
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheClientPartitionedTxTest.cs
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System.Transactions;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests client transactions for multiple nodes with partition awareness.
+ /// </summary>
+ public class CacheClientPartitionedTxTest : CacheClientAbstractTxTest
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="CacheClientPartitionedTxTest"/> class.
+ /// </summary>
+ public CacheClientPartitionedTxTest() : base(3, true)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Test transaction for partition aware client.
+ /// </summary>
+ [Test]
+ public void TestTxPartitioned()
+ {
+ var cache = GetTransactionalCache();
+ var ignite1 = GetIgnite();
+ var ignite2 = GetIgnite(1);
+ var key1 = TestUtils.GetPrimaryKey(ignite1, GetCacheName());
+ var key2 = TestUtils.GetPrimaryKey(ignite2, GetCacheName());
+
+ cache.Put(key1, 1);
+ cache.Put(key2, 2);
+
+ using (Client.GetTransactions().TxStart())
+ {
+ cache.Put(key1, 10);
+ cache.Put(key2, 20);
+
+ Assert.AreEqual(10, cache.Get(key1));
+ Assert.AreEqual(20, cache.Get(key2));
+ }
+
+ Assert.AreEqual(1, cache.Get(key1));
+ Assert.AreEqual(2, cache.Get(key2));
+ }
+
+ /// <summary>
+ /// Test transaction scope for partition aware client.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopePartitioned()
+ {
+ var cache = GetTransactionalCache();
+ var ignite1 = GetIgnite();
+ var ignite2 = GetIgnite(1);
+ var key1 = TestUtils.GetPrimaryKey(ignite1, GetCacheName());
+ var key2 = TestUtils.GetPrimaryKey(ignite2, GetCacheName());
+
+ cache.Put(key1, 1);
+ cache.Put(key2, 2);
+
+ using (new TransactionScope())
+ {
+ cache.Put(key1, 10);
+ cache.Put(key2, 20);
+
+ Assert.AreEqual(10, cache.Get(key1));
+ Assert.AreEqual(20, cache.Get(key2));
+ }
+
+ Assert.AreEqual(1, cache.Get(key1));
+ Assert.AreEqual(2, cache.Get(key2));
+ }
+
+ /** <inheritdoc /> */
+ protected override string GetCacheName()
+ {
+ return "partitioned_" + base.GetCacheName();
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
index ca421e2..73dac2e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
@@ -53,6 +53,9 @@ namespace Apache.Ignite.Core.Tests.Client
/** SSL. */
private readonly bool _enableSsl;
+ /** Partition Awareness */
+ private readonly bool _enablePartitionAwareness;
+
/// <summary>
/// Initializes a new instance of the <see cref="ClientTestBase"/> class.
/// </summary>
@@ -64,10 +67,14 @@ namespace Apache.Ignite.Core.Tests.Client
/// <summary>
/// Initializes a new instance of the <see cref="ClientTestBase"/> class.
/// </summary>
- public ClientTestBase(int gridCount, bool enableSsl = false)
+ public ClientTestBase(
+ int gridCount,
+ bool enableSsl = false,
+ bool enablePartitionAwareness = false)
{
_gridCount = gridCount;
_enableSsl = enableSsl;
+ _enablePartitionAwareness = enablePartitionAwareness;
}
/// <summary>
@@ -122,6 +129,18 @@ namespace Apache.Ignite.Core.Tests.Client
public IIgniteClient Client { get; set; }
/// <summary>
+ /// Gets Ignite.
+ /// </summary>
+ protected static IIgnite GetIgnite(int? idx = null)
+ {
+ if (idx == null)
+ {
+ return Ignition.GetAll().First(i => i.Name == null);
+ }
+ return Ignition.GetIgnite(idx.ToString());
+ }
+
+ /// <summary>
/// Gets the cache.
/// </summary>
protected static ICache<int, T> GetCache<T>()
@@ -178,7 +197,8 @@ namespace Apache.Ignite.Core.Tests.Client
SslProtocols = SslProtocols.Tls12
#endif
}
- : null
+ : null,
+ EnablePartitionAwareness = _enablePartitionAwareness
};
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs
index 5c39785..3261248 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs
@@ -26,8 +26,10 @@ namespace Apache.Ignite.Core.Tests.Client
using System.Xml;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Transactions;
using Apache.Ignite.Core.Impl.Client;
using Apache.Ignite.Core.Log;
+ using Apache.Ignite.Core.Transactions;
using NUnit.Framework;
/// <summary>
@@ -100,6 +102,12 @@ namespace Apache.Ignite.Core.Tests.Client
Logger = new ConsoleLogger
{
MinLevel = LogLevel.Debug
+ },
+ TransactionConfiguration = new TransactionClientConfiguration
+ {
+ DefaultTimeout = TimeSpan.FromSeconds(1),
+ DefaultTransactionConcurrency = TransactionConcurrency.Optimistic,
+ DefaultTransactionIsolation = TransactionIsolation.Serializable
}
};
@@ -316,5 +324,25 @@ namespace Apache.Ignite.Core.Tests.Client
typeof(IgniteClientConfiguration));
}
#endif
+
+ /// <summary>
+ /// Tests <see cref="TransactionClientConfiguration"/> copy ctor.
+ /// </summary>
+ [Test]
+ public void TestTransactionConfigurationCopyCtor()
+ {
+ var sourceCfg = new TransactionClientConfiguration
+ {
+ DefaultTimeout = TimeSpan.MaxValue,
+ DefaultTransactionConcurrency = TransactionConcurrency.Pessimistic,
+ DefaultTransactionIsolation = TransactionIsolation.Serializable
+ };
+
+ var resultCfg = new TransactionClientConfiguration(sourceCfg);
+
+ Assert.AreEqual(sourceCfg.DefaultTimeout, resultCfg.DefaultTimeout);
+ Assert.AreEqual(sourceCfg.DefaultTransactionConcurrency, resultCfg.DefaultTransactionConcurrency);
+ Assert.AreEqual(sourceCfg.DefaultTransactionIsolation, resultCfg.DefaultTransactionIsolation);
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml
index cfe9a8b..289987d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml
@@ -40,4 +40,5 @@
</endpoints>
<logger type="Apache.Ignite.Core.Log.ConsoleLogger" minLevel="Debug" />
+ <transactionConfiguration defaultTimeout="00:00:01" defaultTransactionConcurrency="Optimistic" defaultTransactionIsolation="Serializable" />
</igniteClientConfiguration>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index b34ce8d..b4bc0fe 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Core.Tests
using Apache.Ignite.Core.Cache.Eviction;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.Core.Client.Transactions;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Communication.Tcp;
using Apache.Ignite.Core.Configuration;
@@ -100,6 +101,7 @@ namespace Apache.Ignite.Core.Tests
CheckDefaultValueAttributes(new ClientConnectorConfiguration());
CheckDefaultValueAttributes(new PersistentStoreConfiguration());
CheckDefaultValueAttributes(new IgniteClientConfiguration());
+ CheckDefaultValueAttributes(new TransactionClientConfiguration());
CheckDefaultValueAttributes(new QueryIndex());
CheckDefaultValueAttributes(new DataStorageConfiguration());
CheckDefaultValueAttributes(new DataRegionConfiguration());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 99bfd45..76b77fe 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -71,6 +71,9 @@
<Compile Include="Client\ISslStreamFactory.cs" />
<Compile Include="Client\Services\IServicesClient.cs" />
<Compile Include="Client\SslStreamFactory.cs" />
+ <Compile Include="Client\Transactions\ITransactionClient.cs" />
+ <Compile Include="Client\Transactions\ITransactionsClient.cs" />
+ <Compile Include="Client\Transactions\TransactionClientConfiguration.cs" />
<Compile Include="Cluster\IBaselineNode.cs" />
<Compile Include="Common\IgniteIllegalStateException.cs" />
<Compile Include="Common\IgniteProductVersion.cs" />
@@ -128,7 +131,10 @@
<Compile Include="Impl\Client\Endpoint.cs" />
<Compile Include="Impl\Client\Services\ServicesClient.cs" />
<Compile Include="Impl\Client\SocketEndpoint.cs" />
+ <Compile Include="Impl\Client\Transactions\ClientCacheTransactionManager.cs" />
<Compile Include="Impl\Common\PlatformType.cs" />
+ <Compile Include="Impl\Client\Transactions\TransactionClient.cs" />
+ <Compile Include="Impl\Client\Transactions\TransactionsClient.cs" />
<Compile Include="Impl\Common\TaskRunner.cs" />
<Compile Include="Impl\Compute\ComputeRunner.cs" />
<Compile Include="Impl\IgniteLock.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
index 68a4c82..54c296f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IIgniteClient.cs
@@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Client
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Client.Compute;
using Apache.Ignite.Core.Client.Services;
+ using Apache.Ignite.Core.Client.Transactions;
/// <summary>
/// Main entry point for Ignite Thin Client APIs.
@@ -113,6 +114,14 @@ namespace Apache.Ignite.Core.Client
IBinary GetBinary();
/// <summary>
+ /// Gets Ignite transactions facade <see cref="ITransactionsClient"/>.
+ /// <para /> Transactions are bound to the thread started the transaction. After that, each cache operation within this thread
+ /// will belong to the corresponding transaction until the transaction is committed, rolled back or closed.
+ /// <para /> Should not be used with async calls.
+ /// </summary>
+ ITransactionsClient GetTransactions();
+
+ /// <summary>
/// Gets the configuration.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Semantics.")]
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
index 3689a80..0cd8e15 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
@@ -24,6 +24,7 @@ namespace Apache.Ignite.Core.Client
using System.Linq;
using System.Xml;
using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Client.Transactions;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Client;
using Apache.Ignite.Core.Impl.Common;
@@ -119,6 +120,11 @@ namespace Apache.Ignite.Core.Client
EnablePartitionAwareness = cfg.EnablePartitionAwareness;
Logger = cfg.Logger;
ProtocolVersion = cfg.ProtocolVersion;
+
+ if (cfg.TransactionConfiguration != null)
+ {
+ TransactionConfiguration = new TransactionClientConfiguration(cfg.TransactionConfiguration);
+ }
}
/// <summary>
@@ -217,7 +223,7 @@ namespace Apache.Ignite.Core.Client
/// To do so, connection is established to every known server node at all times.
/// </summary>
public bool EnablePartitionAwareness { get; set; }
-
+
/// <summary>
/// Gets or sets the logger.
/// Default is <see cref="ConsoleLogger"/>. Set to <c>null</c> to disable logging.
@@ -225,6 +231,12 @@ namespace Apache.Ignite.Core.Client
public ILogger Logger { get; set; }
/// <summary>
+ /// Gets or sets the transaction configuration.
+ /// See <see cref="ITransactionsClient"/>, <see cref="IIgniteClient.GetTransactions"/>.
+ /// </summary>
+ public TransactionClientConfiguration TransactionConfiguration { get; set; }
+
+ /// <summary>
/// Gets or sets custom binary processor. Internal property for tests.
/// </summary>
internal IBinaryProcessor BinaryProcessor { get; set; }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/ITransactionClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/ITransactionClient.cs
new file mode 100644
index 0000000..8f5ff27
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/ITransactionClient.cs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Client.Transactions
+{
+ using System;
+ using Apache.Ignite.Core.Transactions;
+
+ /// <summary>
+ /// Thin client transaction.
+ /// </summary>
+ public interface ITransactionClient : IDisposable
+ {
+ /// <summary>
+ /// Gets the transaction concurrency mode.
+ /// </summary>
+ TransactionConcurrency Concurrency { get; }
+
+ /// <summary>
+ /// Gets the transaction isolation level.
+ /// </summary>
+ TransactionIsolation Isolation { get; }
+
+ /// <summary>
+ /// Gets the timeout for this transaction. If transaction times
+ /// out prior to it's completion, an exception will be thrown.
+ /// <see cref="TimeSpan.Zero" /> for infinite timeout.
+ /// </summary>
+ TimeSpan Timeout { get; }
+
+ /// <summary>
+ /// Gets the transaction label.
+ /// </summary>
+ string Label { get; }
+
+ /// <summary>
+ /// Commits this transaction.
+ /// </summary>
+ void Commit();
+
+ /// <summary>
+ /// Rolls back this transaction.
+ /// </summary>
+ void Rollback();
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/ITransactionsClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/ITransactionsClient.cs
new file mode 100644
index 0000000..4cf812f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/ITransactionsClient.cs
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Client.Transactions
+{
+ using System;
+ using System.Transactions;
+ using Apache.Ignite.Core.Transactions;
+
+ /// <summary>
+ /// Ignite Thin Client transactions facade.
+ /// <para />
+ /// Transactions are bound to the thread started the transaction. After that, each cache operation within this thread
+ /// will belong to the corresponding transaction until the transaction is committed, rolled back or closed.
+ /// <para />
+ /// Should not be used with async calls.
+ /// <example>
+ /// You can use cache transactions as follows:
+ /// <code>
+ /// using (var tx = igniteClient.GetTransactions().TxStart())
+ /// {
+ /// int v1 = cache<string, int>.Get("k1");
+ ///
+ /// // Check if v1 satisfies some condition before doing a put.
+ /// if (v1 > 0)
+ /// cache.Put<string, int>("k1", 2);
+ ///
+ /// cache.Remove("k2");
+ ///
+ /// // Commit the transaction.
+ /// tx.Commit();
+ /// }
+ /// </code>
+ /// </example>
+ ///
+ /// Alternatively, <see cref="TransactionScope"/> can be used to start Ignite transactions.
+ /// <example>
+ /// <code>
+ /// using (var ts = new TransactionScope())
+ /// {
+ /// int v1 = cache<string, int>.Get("k1");
+ ///
+ /// // Check if v1 satisfies some condition before doing a put.
+ /// if (v1 > 0)
+ /// cache.Put<string, int>("k1", 2);
+ ///
+ /// cache.Remove("k2");
+ ///
+ /// // Commit the transaction.
+ /// ts.Complete();
+ /// }
+ /// </code>
+ /// </example>
+ /// </summary>
+ public interface ITransactionsClient
+ {
+ /// <summary>
+ /// Gets transaction started by this thread or null if this thread does not have a transaction.
+ /// </summary>
+ /// <value>
+ /// Transaction started by this thread or null if this thread does not have a transaction.
+ /// </value>
+ ITransactionClient Tx { get; }
+
+ /// <summary>
+ /// Gets the default transaction concurrency.
+ /// </summary>
+ TransactionConcurrency DefaultTransactionConcurrency { get; }
+
+ /// <summary>
+ /// Gets the default transaction isolation.
+ /// </summary>
+ TransactionIsolation DefaultTransactionIsolation { get; }
+
+ /// <summary>
+ /// Gets the default transaction timeout.
+ /// </summary>
+ TimeSpan DefaultTimeout { get; }
+
+ /// <summary>
+ /// Starts a new transaction with the default isolation level, concurrency and timeout.
+ /// <para />
+ /// Default values for transaction isolation level, concurrency and timeout can be configured via
+ /// <see cref="TransactionClientConfiguration" />.
+ /// <para />
+ /// Should not be used with async calls.
+ /// </summary>
+ /// <returns>New transaction.</returns>
+ ITransactionClient TxStart();
+
+ /// <summary>
+ /// Starts a new transaction with the specified concurrency and isolation.
+ /// <para />
+ /// Should not be used with async calls.
+ /// </summary>
+ /// <param name="concurrency">Concurrency.</param>
+ /// <param name="isolation">Isolation.</param>
+ /// <returns>New transaction.</returns>
+ ITransactionClient TxStart(TransactionConcurrency concurrency, TransactionIsolation isolation);
+
+ /// <summary>
+ /// Starts a new transaction with the specified concurrency, isolation and timeout.
+ /// <para />
+ /// Should not be used with async calls.
+ /// </summary>
+ /// <param name="concurrency">Concurrency.</param>
+ /// <param name="isolation">Isolation.</param>
+ /// <param name="timeout">Timeout. TimeSpan. Zero for indefinite timeout.</param>
+ /// <returns>New transaction.</returns>
+ ITransactionClient TxStart(TransactionConcurrency concurrency, TransactionIsolation isolation,
+ TimeSpan timeout);
+
+ /// <summary>
+ /// Returns instance of <see cref="ITransactionsClient" /> to mark a transaction instance with a special label.
+ /// The label is helpful for diagnostic and exposed to some diagnostic tools like
+ /// SYS.TRANSACTIONS system view, control.sh commands, JMX TransactionsMXBean,
+ /// long-running transactions dump in logs
+ /// and <see cref="ITransaction.Label" /> via <see cref="ITransactions.GetLocalActiveTransactions" />.
+ /// </summary>
+ /// <param name="label">Label.</param>
+ /// <returns>
+ /// <see cref="T:Apache.Ignite.Core.Client.Transactions.IClientTransactions" />
+ /// </returns>
+ ITransactionsClient WithLabel(string label);
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/TransactionClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/TransactionClientConfiguration.cs
new file mode 100644
index 0000000..476b60b
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/Transactions/TransactionClientConfiguration.cs
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Client.Transactions
+{
+ using System;
+ using System.ComponentModel;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Transactions;
+
+ /// <summary>
+ /// Thin client transactions configuration.
+ /// Default values specified here will be used by <see cref="ITransactionsClient.TxStart()"/>.
+ /// </summary>
+ public class TransactionClientConfiguration
+ {
+ /// <summary> The default value for <see cref="DefaultTransactionConcurrency"/> property. </summary>
+ public const TransactionConcurrency DefaultDefaultTransactionConcurrency = TransactionConcurrency.Pessimistic;
+
+ /// <summary> The default value for <see cref="DefaultTransactionIsolation"/> property. </summary>
+ public const TransactionIsolation DefaultDefaultTransactionIsolation = TransactionIsolation.RepeatableRead;
+
+ /// <summary> The default value for <see cref="DefaultTransactionIsolation"/> property. </summary>
+ public static readonly TimeSpan DefaultDefaultTimeout = TimeSpan.Zero;
+
+ /// <summary>
+ /// Gets or sets the cache transaction concurrency to use when one is not explicitly specified.
+ /// </summary>
+ [DefaultValue(DefaultDefaultTransactionConcurrency)]
+ public TransactionConcurrency DefaultTransactionConcurrency { get; set; }
+
+ /// <summary>
+ /// Gets or sets the cache transaction isolation to use when one is not explicitly specified.
+ /// </summary>
+ [DefaultValue(DefaultDefaultTransactionIsolation)]
+ public TransactionIsolation DefaultTransactionIsolation { get; set; }
+
+ /// <summary>
+ /// Gets or sets the cache transaction timeout to use when one is not explicitly specified.
+ /// <see cref="TimeSpan.Zero"/> for infinite timeout.
+ /// </summary>
+ [DefaultValue(typeof(TimeSpan), "00:00:00")]
+ public TimeSpan DefaultTimeout { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TransactionClientConfiguration" /> class.
+ /// </summary>
+ public TransactionClientConfiguration()
+ {
+ DefaultTransactionConcurrency = DefaultDefaultTransactionConcurrency;
+ DefaultTransactionIsolation = DefaultDefaultTransactionIsolation;
+ DefaultTimeout = DefaultDefaultTimeout;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="TransactionClientConfiguration" /> class.
+ /// </summary>
+ /// <param name="cfg">The configuration to copy.</param>
+ public TransactionClientConfiguration(TransactionClientConfiguration cfg)
+ {
+ IgniteArgumentCheck.NotNull(cfg, "configuration");
+
+ DefaultTransactionConcurrency = cfg.DefaultTransactionConcurrency;
+ DefaultTransactionIsolation = cfg.DefaultTransactionIsolation;
+ DefaultTimeout = cfg.DefaultTimeout;
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
index ba64aca..f78508e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
@@ -24,6 +24,21 @@
xmlns:mstns="http://ignite.apache.org/schema/dotnet/IgniteClientConfigurationSection"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ <xs:simpleType name="transactionIsolation" final="restriction">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="ReadCommitted" />
+ <xs:enumeration value="RepeatableRead" />
+ <xs:enumeration value="Serializable" />
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:simpleType name="transactionConcurrency" final="restriction">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="Optimistic" />
+ <xs:enumeration value="Pessimistic" />
+ </xs:restriction>
+ </xs:simpleType>
+
<xs:element name="igniteClientConfiguration">
<xs:annotation>
<xs:documentation>Ignite thin client configuration root.</xs:documentation>
@@ -228,6 +243,28 @@
</xs:attribute>
</xs:complexType>
</xs:element>
+ <xs:element name="transactionConfiguration" minOccurs="0" maxOccurs="1">
+ <xs:annotation>
+ <xs:documentation>Transactions configuration.</xs:documentation>
+ </xs:annotation>
+ <xs:complexType>
+ <xs:attribute name="defaultTimeout" type="xs:string">
+ <xs:annotation>
+ <xs:documentation>Default timeout.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ <xs:attribute name="defaultTransactionConcurrency" type="transactionConcurrency">
+ <xs:annotation>
+ <xs:documentation>cache transaction concurrency control.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ <xs:attribute name="defaultTransactionIsolation" type="transactionIsolation">
+ <xs:annotation>
+ <xs:documentation>Default cache transaction isolation level.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
+ </xs:complexType>
+ </xs:element>
</xs:all>
<xs:attribute name="host" type="xs:string" use="required">
<xs:annotation>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
index f240922..2f27bcf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
@@ -50,8 +50,9 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
internal sealed class CacheClient<TK, TV> : ICacheClient<TK, TV>, ICacheInternal
{
/// <summary>
- /// Additional flag values for cache operations.
+ /// Additional flags values for cache operations.
/// </summary>
+ [Flags]
private enum ClientCacheRequestFlag : byte
{
/// <summary>
@@ -145,6 +146,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheGet, key, ctx => UnmarshalNotNull<TV>(ctx));
}
@@ -162,6 +165,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ _ignite.Transactions.StartTxIfNeeded();
+
var res = DoOutInOpAffinity(ClientOp.CacheGet, key, UnmarshalCacheResult<TV>);
value = res.Value;
@@ -183,6 +188,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOp(ClientOp.CacheGetAll, ctx => ctx.Writer.WriteEnumerable(keys),
s => ReadCacheEntries(s.Stream));
}
@@ -202,6 +209,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ _ignite.Transactions.StartTxIfNeeded();
+
DoOutInOpAffinity<object>(ClientOp.CachePut, key, val, null);
}
@@ -301,6 +310,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheGetAndPut, key, val, UnmarshalCacheResult<TV>);
}
@@ -319,6 +330,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheGetAndReplace, key, val, UnmarshalCacheResult<TV>);
}
@@ -336,6 +349,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheGetAndRemove, key, UnmarshalCacheResult<TV>);
}
@@ -353,6 +368,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CachePutIfAbsent, key, val, ctx => ctx.Stream.ReadBool());
}
@@ -371,6 +388,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheGetAndPutIfAbsent, key, val, UnmarshalCacheResult<TV>);
}
@@ -389,6 +408,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheReplace, key, val, ctx => ctx.Stream.ReadBool());
}
@@ -408,6 +429,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheReplaceIfEquals, key, ctx =>
{
ctx.Writer.WriteObjectDetached(key);
@@ -436,6 +459,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
+ _ignite.Transactions.StartTxIfNeeded();
+
DoOutOp(ClientOp.CachePutAll, ctx => ctx.Writer.WriteDictionary(vals));
}
@@ -496,6 +521,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheRemoveKey, key, ctx => ctx.Stream.ReadBool());
}
@@ -513,6 +540,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ _ignite.Transactions.StartTxIfNeeded();
+
return DoOutInOpAffinity(ClientOp.CacheRemoveIfEquals, key, val, ctx => ctx.Stream.ReadBool());
}
@@ -530,6 +559,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
+ _ignite.Transactions.StartTxIfNeeded();
+
DoOutOp(ClientOp.CacheRemoveKeys, ctx => ctx.Writer.WriteEnumerable(keys));
}
@@ -544,6 +575,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
/** <inheritDoc /> */
public void RemoveAll()
{
+ _ignite.Transactions.StartTxIfNeeded();
+
DoOutOp(ClientOp.CacheRemoveAll);
}
@@ -774,14 +807,31 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
ctx.Stream.WriteInt(_id);
+ var flags = ClientCacheRequestFlag.None;
if (_expiryPolicy != null)
{
ctx.Features.ValidateWithExpiryPolicyFlag();
- ctx.Stream.WriteByte((byte) ClientCacheRequestFlag.WithExpiryPolicy);
+ flags = flags | ClientCacheRequestFlag.WithExpiryPolicy;
+ }
+
+ var tx = _ignite.Transactions.Tx;
+ if (tx != null)
+ {
+ flags |= ClientCacheRequestFlag.WithTransactional;
+ }
+
+ ctx.Stream.WriteByte((byte) flags);
+
+ if ((flags & ClientCacheRequestFlag.WithExpiryPolicy) == ClientCacheRequestFlag.WithExpiryPolicy)
+ {
ExpiryPolicySerializer.WritePolicy(ctx.Writer, _expiryPolicy);
}
- else
- ctx.Stream.WriteByte((byte) ClientCacheRequestFlag.None); // Flags (skipStore, etc).
+
+ if ((flags & ClientCacheRequestFlag.WithTransactional) == ClientCacheRequestFlag.WithTransactional)
+ {
+ // ReSharper disable once PossibleNullReferenceException flag is set only if tx != null
+ ctx.Writer.WriteInt(tx.Id);
+ }
if (writeAction != null)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
index 5e35711..74680b9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
@@ -31,6 +31,7 @@ namespace Apache.Ignite.Core.Impl.Client
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Client.Cache;
+ using Apache.Ignite.Core.Impl.Client.Transactions;
using Apache.Ignite.Core.Impl.Log;
using Apache.Ignite.Core.Log;
@@ -54,6 +55,9 @@ namespace Apache.Ignite.Core.Impl.Client
/** Marshaller. */
private readonly Marshaller _marsh;
+ /** Transactions. */
+ private readonly TransactionsClient _transactions;
+
/** Endpoints with corresponding hosts - from config. */
private readonly List<SocketEndpoint> _endPoints;
@@ -94,14 +98,20 @@ namespace Apache.Ignite.Core.Impl.Client
/// Initializes a new instance of the <see cref="ClientFailoverSocket"/> class.
/// </summary>
/// <param name="config">The configuration.</param>
- /// <param name="marsh"></param>
- public ClientFailoverSocket(IgniteClientConfiguration config, Marshaller marsh)
+ /// <param name="marsh">The marshaller.</param>
+ /// <param name="transactions">The transactions.</param>
+ public ClientFailoverSocket(
+ IgniteClientConfiguration config,
+ Marshaller marsh,
+ TransactionsClient transactions)
{
Debug.Assert(config != null);
Debug.Assert(marsh != null);
+ Debug.Assert(transactions != null);
_config = config;
_marsh = marsh;
+ _transactions = transactions;
#pragma warning disable 618 // Type or member is obsolete
if (config.Host == null && (config.Endpoints == null || config.Endpoints.Count == 0))
@@ -247,6 +257,12 @@ namespace Apache.Ignite.Core.Impl.Client
/// </summary>
private ClientSocket GetSocket()
{
+ var tx = _transactions.Tx;
+ if (tx != null)
+ {
+ return tx.Socket;
+ }
+
lock (_socketLock)
{
ThrowIfDisposed();
@@ -269,6 +285,12 @@ namespace Apache.Ignite.Core.Impl.Client
return null;
}
+ // Transactional operation should be executed on node started the transaction.
+ if (_transactions.Tx != null)
+ {
+ return null;
+ }
+
UpdateDistributionMap(cacheId);
var distributionMap = _distributionMap;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFeatures.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFeatures.cs
index 06f0d8a..d228fe7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFeatures.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFeatures.cs
@@ -44,7 +44,9 @@ namespace Apache.Ignite.Core.Impl.Client
{ClientOp.ClusterIsActive, new ClientProtocolVersion(1, 5, 0)},
{ClientOp.ClusterChangeState, new ClientProtocolVersion(1, 5, 0)},
{ClientOp.ClusterChangeWalState, new ClientProtocolVersion(1, 5, 0)},
- {ClientOp.ClusterGetWalState, new ClientProtocolVersion(1, 5, 0)}
+ {ClientOp.ClusterGetWalState, new ClientProtocolVersion(1, 5, 0)},
+ {ClientOp.TxStart, new ClientProtocolVersion(1, 5, 0)},
+ {ClientOp.TxEnd, new ClientProtocolVersion(1, 5, 0)},
};
/** */
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
index 238058f..1a9dd94 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
@@ -72,6 +72,10 @@ namespace Apache.Ignite.Core.Impl.Client
BinaryTypeGet = 3002,
BinaryTypePut = 3003,
+ // Transactions
+ TxStart = 4000,
+ TxEnd = 4001,
+
// Cluster.
ClusterIsActive = 5000,
ClusterChangeState = 5001,
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
index cad5ea1..7dee984 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
@@ -30,6 +30,7 @@ namespace Apache.Ignite.Core.Impl.Client
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Client.Compute;
using Apache.Ignite.Core.Client.Services;
+ using Apache.Ignite.Core.Client.Transactions;
using Apache.Ignite.Core.Datastream;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Cache;
@@ -38,6 +39,7 @@ namespace Apache.Ignite.Core.Impl.Client
using Apache.Ignite.Core.Impl.Client.Cluster;
using Apache.Ignite.Core.Impl.Client.Compute;
using Apache.Ignite.Core.Impl.Client.Services;
+ using Apache.Ignite.Core.Impl.Client.Transactions;
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Handle;
@@ -63,6 +65,9 @@ namespace Apache.Ignite.Core.Impl.Client
/** Configuration. */
private readonly IgniteClientConfiguration _configuration;
+ /** Transactions. */
+ private readonly TransactionsClient _transactions;
+
/** Node info cache. */
private readonly ConcurrentDictionary<Guid, IClientClusterNode> _nodes =
new ConcurrentDictionary<Guid, IClientClusterNode>();
@@ -91,7 +96,9 @@ namespace Apache.Ignite.Core.Impl.Client
Ignite = this
};
- _socket = new ClientFailoverSocket(_configuration, _marsh);
+ _transactions = new TransactionsClient(this, clientConfiguration.TransactionConfiguration);
+
+ _socket = new ClientFailoverSocket(_configuration, _marsh, _transactions);
_binProc = _configuration.BinaryProcessor ?? new BinaryProcessorClient(_socket);
@@ -118,6 +125,7 @@ namespace Apache.Ignite.Core.Impl.Client
public void Dispose()
{
_socket.Dispose();
+ _transactions.Dispose();
}
/** <inheritDoc /> */
@@ -204,6 +212,18 @@ namespace Apache.Ignite.Core.Impl.Client
}
/** <inheritDoc /> */
+ ITransactionsClient IIgniteClient.GetTransactions()
+ {
+ return _transactions;
+ }
+
+ /** Internal transactions representation. */
+ internal TransactionsClient Transactions
+ {
+ get { return _transactions; }
+ }
+
+ /** <inheritDoc /> */
public CacheAffinityImpl GetAffinity(string cacheName)
{
throw GetClientNotSupportedException();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/ClientCacheTransactionManager.cs
similarity index 50%
copy from modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
copy to modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/ClientCacheTransactionManager.cs
index 9f92a12..f08e703 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/ClientCacheTransactionManager.cs
@@ -15,43 +15,41 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Core.Impl.Transactions
+namespace Apache.Ignite.Core.Impl.Client.Transactions
{
using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Transactions;
- using Apache.Ignite.Core.Transactions;
+ using Apache.Ignite.Core.Client.Transactions;
+ using Apache.Ignite.Core.Impl.Transactions;
/// <summary>
/// Cache transaction enlistment manager,
/// allows using Ignite transactions via standard <see cref="TransactionScope"/>.
/// </summary>
- // ReSharper disable once ClassNeverInstantiated.Global
- internal class CacheTransactionManager : IEnlistmentNotification
+ internal class ClientCacheTransactionManager : ISinglePhaseNotification, IDisposable
{
/** */
- private readonly ITransactions _transactions;
+ private ITransactionsClient _transactions;
/** */
- private static readonly ThreadLocal<Enlistment> Enlistment = new ThreadLocal<Enlistment>();
+ private ThreadLocal<Enlistment> _enlistment = new ThreadLocal<Enlistment>();
/// <summary>
- /// Initializes a new instance of <see cref="CacheTransactionManager"/> class.
+ /// Initializes a new instance of <see cref="ClientCacheTransactionManager"/> class.
/// </summary>
/// <param name="transactions">Transactions.</param>
- public CacheTransactionManager(ITransactions transactions)
+ public ClientCacheTransactionManager(ITransactionsClient transactions)
{
- Debug.Assert(transactions != null);
-
_transactions = transactions;
}
/// <summary>
/// If ambient transaction is present, starts an Ignite transaction and enlists it.
/// </summary>
- public void StartTx()
+ public void StartTxIfNeeded()
{
if (_transactions.Tx != null)
{
@@ -65,90 +63,56 @@ namespace Apache.Ignite.Core.Impl.Transactions
if (ambientTx != null && ambientTx.TransactionInformation.Status == TransactionStatus.Active)
{
- _transactions.TxStart(_transactions.DefaultTransactionConcurrency,
- ConvertTransactionIsolation(ambientTx.IsolationLevel));
-
- Enlistment.Value = ambientTx.EnlistVolatile(this, EnlistmentOptions.None);
+ _transactions.TxStart(_transactions.DefaultTransactionConcurrency,
+ CacheTransactionManager.ConvertTransactionIsolation(ambientTx.IsolationLevel),
+ _transactions.DefaultTimeout);
+ _enlistment.Value = ambientTx.EnlistVolatile(this, EnlistmentOptions.None);
}
}
- /// <summary>
- /// Gets a value indicating whether there is an active transaction.
- /// </summary>
- public bool IsInTx()
- {
- return _transactions.Tx != null;
- }
-
- /** <inheritdoc /> */
- [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
+ /** <inheritDoc /> */
void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)
{
- Debug.Assert(preparingEnlistment != null);
-
- var igniteTx = _transactions.Tx;
-
- if (igniteTx != null && Enlistment.Value != null)
- {
- try
- {
- ((Transaction) igniteTx).Prepare();
- }
- catch (Exception)
- {
- // Prepare failed - release Ignite transaction (we won't have another chance to do this).
- igniteTx.Dispose();
- throw;
- }
- }
-
preparingEnlistment.Prepared();
}
- /** <inheritdoc /> */
- [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
+ /** <inheritDoc /> */
void IEnlistmentNotification.Commit(Enlistment enlistment)
{
Debug.Assert(enlistment != null);
var igniteTx = _transactions.Tx;
- if (igniteTx != null && Enlistment.Value != null)
+ if (igniteTx != null && _enlistment.Value != null)
{
- Debug.Assert(ReferenceEquals(enlistment, Enlistment.Value));
+ Debug.Assert(ReferenceEquals(enlistment, _enlistment.Value));
igniteTx.Commit();
-
igniteTx.Dispose();
-
- Enlistment.Value = null;
+ _enlistment.Value = null;
}
enlistment.Done();
}
- /** <inheritdoc /> */
- [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
+ /** <inheritDoc /> */
void IEnlistmentNotification.Rollback(Enlistment enlistment)
{
Debug.Assert(enlistment != null);
var igniteTx = _transactions.Tx;
- if (igniteTx != null && Enlistment.Value != null)
+ if (igniteTx != null && _enlistment.Value != null)
{
igniteTx.Rollback();
-
igniteTx.Dispose();
-
- Enlistment.Value = null;
+ _enlistment.Value = null;
}
enlistment.Done();
}
- /** <inheritdoc /> */
- [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
+ /** <inheritDoc /> */
void IEnlistmentNotification.InDoubt(Enlistment enlistment)
{
Debug.Assert(enlistment != null);
@@ -156,25 +120,35 @@ namespace Apache.Ignite.Core.Impl.Transactions
enlistment.Done();
}
- /// <summary>
- /// Converts the isolation level from .NET-specific to Ignite-specific.
- /// </summary>
- private static TransactionIsolation ConvertTransactionIsolation(IsolationLevel isolation)
+ /** <inheritDoc /> */
+ void ISinglePhaseNotification.SinglePhaseCommit(SinglePhaseEnlistment enlistment)
+ {
+ Debug.Assert(enlistment != null);
+
+ var igniteTx = _transactions.Tx;
+
+ if (igniteTx != null && _enlistment.Value != null)
+ {
+ igniteTx.Commit();
+ igniteTx.Dispose();
+ _enlistment.Value = null;
+ }
+
+ enlistment.Committed();
+ }
+
+ /** <inheritDoc /> */
+ [SuppressMessage("Microsoft.Usage",
+ "CA1816:CallGCSuppressFinalizeCorrectly",
+ Justification = "There is no finalizer.")]
+ public void Dispose()
{
- switch (isolation)
+ _transactions = null;
+ var localEnlistment = _enlistment;
+ if (localEnlistment != null)
{
- case IsolationLevel.Serializable:
- return TransactionIsolation.Serializable;
- case IsolationLevel.RepeatableRead:
- return TransactionIsolation.RepeatableRead;
- case IsolationLevel.ReadCommitted:
- case IsolationLevel.ReadUncommitted:
- case IsolationLevel.Snapshot:
- case IsolationLevel.Chaos:
- return TransactionIsolation.ReadCommitted;
- default:
- throw new ArgumentOutOfRangeException("isolation", isolation,
- "Unsupported transaction isolation level: " + isolation);
+ localEnlistment.Dispose();
+ _enlistment = null;
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/TransactionClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/TransactionClient.cs
new file mode 100644
index 0000000..24553a5
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/TransactionClient.cs
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client.Transactions
+{
+ using System;
+ using System.Globalization;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Transactions;
+ using Apache.Ignite.Core.Transactions;
+
+ /// <summary>
+ /// Ignite Thin Client transaction facade.
+ /// </summary>
+ internal class TransactionClient : ITransactionClient
+ {
+ /** Unique transaction ID.*/
+ private readonly int _id;
+
+ /** Socket. */
+ private readonly ClientSocket _socket;
+
+ /** Transaction is closed. */
+ private bool _closed;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="id">ID.</param>
+ /// <param name="socket">Socket.</param>
+ /// <param name="concurrency">Concurrency.</param>
+ /// <param name="isolation">Isolation.</param>
+ /// <param name="timeout">Timeout.</param>
+ /// <param name="label">Label.</param>
+ public TransactionClient(int id,
+ ClientSocket socket,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ TimeSpan timeout,
+ string label)
+ {
+ _id = id;
+ _socket = socket;
+ Concurrency = concurrency;
+ Isolation = isolation;
+ Timeout = timeout;
+ Label = label;
+ }
+
+ /** <inheritdoc /> */
+ public void Commit()
+ {
+ ThrowIfClosed();
+ Close(true);
+ }
+
+ /** <inheritdoc /> */
+ public void Rollback()
+ {
+ ThrowIfClosed();
+ Close(false);
+ }
+
+ /** <inheritdoc /> */
+ public TransactionConcurrency Concurrency { get; private set; }
+
+ /** <inheritdoc /> */
+ public TransactionIsolation Isolation { get; private set; }
+
+ /** <inheritdoc /> */
+ public TimeSpan Timeout { get; private set; }
+
+ /** <inheritdoc /> */
+ public string Label { get; private set; }
+
+ /** <inheritdoc /> */
+ public void Dispose()
+ {
+ try
+ {
+ Close(false);
+ }
+ catch
+ {
+ if (!_socket.IsDisposed)
+ {
+ throw;
+ }
+ }
+ finally
+ {
+ GC.SuppressFinalize(this);
+ }
+ }
+
+ /// <summary>
+ /// Transaction Id.
+ /// </summary>
+ public int Id
+ {
+ get { return _id; }
+ }
+
+ public ClientSocket Socket
+ {
+ get
+ {
+ if (_socket.IsDisposed)
+ {
+ throw new IgniteClientException("Transaction context has been lost due to connection errors.");
+ }
+
+ return _socket;
+ }
+ }
+
+ /// <summary>
+ /// Returns if transaction is closed.
+ /// </summary>
+ internal bool Closed
+ {
+ get { return _closed; }
+ }
+
+ /// <summary>
+ /// Closes the transaction.
+ /// </summary>
+ private void Close(bool commit)
+ {
+ if (!_closed)
+ {
+ try
+ {
+ Socket.DoOutInOp<object>(ClientOp.TxEnd,
+ ctx =>
+ {
+ ctx.Writer.WriteInt(_id);
+ ctx.Writer.WriteBoolean(commit);
+ },
+ null);
+ }
+ finally
+ {
+ _closed = true;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Throws and exception if transaction is closed.
+ /// </summary>
+ private void ThrowIfClosed()
+ {
+ if (_closed)
+ {
+ throw new InvalidOperationException(string.Format(CultureInfo.InvariantCulture,
+ "Transaction {0} is closed",
+ Id));
+ }
+ }
+
+ /** <inheritdoc /> */
+ ~TransactionClient()
+ {
+ Dispose();
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/TransactionsClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/TransactionsClient.cs
new file mode 100644
index 0000000..857e9a2
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Transactions/TransactionsClient.cs
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client.Transactions
+{
+ using System;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Transactions;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Transactions;
+
+ /// <summary>
+ /// Ignite Thin Client transactions facade.
+ /// </summary>
+ internal class TransactionsClient : ITransactionsClient, IDisposable
+ {
+ /** Default transaction configuration. */
+ private readonly TransactionClientConfiguration _cfg;
+
+ /** Transaction for this thread and client. */
+ private readonly ThreadLocal<TransactionClient> _currentTx = new ThreadLocal<TransactionClient>();
+
+ /** Ignite. */
+ private readonly IgniteClient _ignite;
+
+ /** Transaction manager. */
+ private readonly ClientCacheTransactionManager _txManager;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="ignite">Ignite.</param>
+ /// <param name="cfg"></param>
+ public TransactionsClient(IgniteClient ignite, TransactionClientConfiguration cfg)
+ {
+ _ignite = ignite;
+ _cfg = cfg ?? new TransactionClientConfiguration();
+ _txManager = new ClientCacheTransactionManager(this);
+ }
+
+ /** <inheritDoc /> */
+ [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
+ Justification = "There is no finalizer.")]
+ public void Dispose()
+ {
+ _currentTx.Dispose();
+ _txManager.Dispose();
+ }
+
+ /// <summary>
+ /// Starts ambient transaction if needed.
+ /// </summary>
+ internal void StartTxIfNeeded()
+ {
+ _txManager.StartTxIfNeeded();
+ }
+
+ /** <inheritDoc /> */
+ ITransactionClient ITransactionsClient.Tx
+ {
+ get { return Tx; }
+ }
+
+ /// <summary>
+ /// Gets transaction started by this thread or null if this thread does not have a transaction.
+ /// </summary>
+ internal TransactionClient Tx
+ {
+ get
+ {
+ var tx = _currentTx.Value;
+ if (tx == null)
+ return null;
+
+ if (tx.Closed)
+ {
+ _currentTx.Value = null;
+
+ return null;
+ }
+
+ return tx;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public TransactionConcurrency DefaultTransactionConcurrency
+ {
+ get { return _cfg.DefaultTransactionConcurrency; }
+ }
+
+ /** <inheritDoc /> */
+ public TransactionIsolation DefaultTransactionIsolation
+ {
+ get { return _cfg.DefaultTransactionIsolation; }
+ }
+
+ /** <inheritDoc /> */
+ public TimeSpan DefaultTimeout
+ {
+ get { return _cfg.DefaultTimeout; }
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionClient TxStart()
+ {
+ return TxStart(_cfg.DefaultTransactionConcurrency, _cfg.DefaultTransactionIsolation);
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionClient TxStart(TransactionConcurrency concurrency, TransactionIsolation isolation)
+ {
+ return TxStart(concurrency, isolation, _cfg.DefaultTimeout);
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionClient TxStart(TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ TimeSpan timeout)
+ {
+ return TxStart(concurrency, isolation, timeout, null);
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionsClient WithLabel(string label)
+ {
+ IgniteArgumentCheck.NotNullOrEmpty(label, "label");
+
+ return new TransactionsClientWithLabel(this, label);
+ }
+
+ /// <summary>
+ /// Starts a new transaction with the specified concurrency, isolation, timeout and label.
+ /// </summary>
+ private ITransactionClient TxStart(TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ TimeSpan timeout,
+ string label)
+ {
+ if (Tx != null)
+ {
+ throw new IgniteClientException("A transaction has already been started by the current thread.");
+ }
+
+ var tx = _ignite.Socket.DoOutInOp(
+ ClientOp.TxStart,
+ ctx =>
+ {
+ ctx.Writer.WriteByte((byte) concurrency);
+ ctx.Writer.WriteByte((byte) isolation);
+ ctx.Writer.WriteTimeSpanAsLong(timeout);
+ ctx.Writer.WriteString(label);
+ },
+ ctx => new TransactionClient(
+ ctx.Reader.ReadInt(),
+ ctx.Socket,
+ concurrency,
+ isolation,
+ timeout,
+ label)
+ );
+
+ _currentTx.Value = tx;
+
+ return tx;
+ }
+
+ /// <summary>
+ /// Wrapper for transactions with label.
+ /// </summary>
+ private class TransactionsClientWithLabel : ITransactionsClient
+ {
+ /** Label. */
+ private readonly string _label;
+
+ /** Transactions. */
+ private readonly TransactionsClient _transactions;
+
+ /// <summary>
+ /// Client transactions wrapper with label.
+ /// </summary>
+ public TransactionsClientWithLabel(TransactionsClient transactions, string label)
+ {
+ _transactions = transactions;
+ _label = label;
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionClient TxStart()
+ {
+ return TxStart(DefaultTransactionConcurrency, DefaultTransactionIsolation);
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionClient TxStart(TransactionConcurrency concurrency, TransactionIsolation isolation)
+ {
+ return TxStart(concurrency, isolation, DefaultTimeout);
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionClient TxStart(
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ TimeSpan timeout)
+ {
+ return _transactions.TxStart(concurrency, isolation, timeout, _label);
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionsClient WithLabel(string label)
+ {
+ return new TransactionsClientWithLabel(_transactions, label);
+ }
+
+ /** <inheritDoc /> */
+ public ITransactionClient Tx
+ {
+ get { return _transactions.Tx; }
+ }
+
+ /** <inheritDoc /> */
+ public TransactionConcurrency DefaultTransactionConcurrency
+ {
+ get { return _transactions.DefaultTransactionConcurrency; }
+ }
+
+ /** <inheritDoc /> */
+ public TransactionIsolation DefaultTransactionIsolation
+ {
+ get { return _transactions.DefaultTransactionIsolation; }
+ }
+
+ /** <inheritDoc /> */
+ public TimeSpan DefaultTimeout
+ {
+ get { return _transactions.DefaultTimeout; }
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
index 9f92a12..59d5c26 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
@@ -159,7 +159,7 @@ namespace Apache.Ignite.Core.Impl.Transactions
/// <summary>
/// Converts the isolation level from .NET-specific to Ignite-specific.
/// </summary>
- private static TransactionIsolation ConvertTransactionIsolation(IsolationLevel isolation)
+ internal static TransactionIsolation ConvertTransactionIsolation(IsolationLevel isolation)
{
switch (isolation)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransaction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransaction.cs
index 31c2b1a..ed3591f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransaction.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Transactions/ITransaction.cs
@@ -92,9 +92,7 @@ namespace Apache.Ignite.Core.Transactions
/// <example>
/// You can use cache transactions as follows:
/// <code>
- /// ICacheTx tx = cache.TxStart();
- ///
- /// try
+ /// using (var tx = cache.Ignite.GetTransactions().TxStart())
/// {
/// int v1 = cache<string, int>.Get("k1");
///
@@ -102,16 +100,12 @@ namespace Apache.Ignite.Core.Transactions
/// if (v1 > 0)
/// cache.Put<string, int>("k1", 2);
///
- /// cache.Removex("k2);
+ /// cache.Remove("k2");
///
/// // Commit the transaction.
/// tx.Commit();
/// }
- /// finally
- /// {
- /// tx.Dispose();
- /// }
- ///
+ ///
/// </code>
/// </example>
/// </summary>