You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/01/26 15:06:13 UTC
ignite git commit: IGNITE-3430 .NET: Run Ignite transactions via
standard TransactionScope API
Repository: ignite
Updated Branches:
refs/heads/master 9d64a281b -> f5375deea
IGNITE-3430 .NET: Run Ignite transactions via standard TransactionScope API
This closes #1407
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f5375dee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f5375dee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f5375dee
Branch: refs/heads/master
Commit: f5375deead7fd40787cfb2e7130ee7c2d58820af
Parents: 9d64a28
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Jan 26 17:46:36 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Jan 26 17:46:36 2017 +0300
----------------------------------------------------------------------
.../transactions/PlatformTransactions.java | 9 +
.../Apache.Ignite.Core.Tests.csproj | 1 +
.../Cache/CacheAbstractTest.cs | 21 ++
.../Cache/CacheAbstractTransactionalTest.cs | 324 +++++++++++++++++++
.../Apache.Ignite.Core.csproj | 2 +
.../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 4 +-
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 90 +++++-
.../Transactions/CacheTransactionManager.cs | 160 +++++++++
.../Impl/Transactions/Transaction.cs | 8 +
.../Impl/Transactions/TransactionImpl.cs | 13 +
.../Impl/Transactions/TransactionsImpl.cs | 11 +
11 files changed, 631 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 3cee2b1..21f71fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.util.GridConcurrentFactory;
@@ -75,6 +76,9 @@ public class PlatformTransactions extends PlatformAbstractTarget {
public static final int OP_RESET_METRICS = 11;
/** */
+ public static final int OP_PREPARE = 12;
+
+ /** */
private final IgniteTransactions txs;
/** Map with currently active transactions. */
@@ -155,6 +159,11 @@ public class PlatformTransactions extends PlatformAbstractTarget {
/** {@inheritDoc} */
@Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
+ case OP_PREPARE:
+ ((TransactionProxyImpl)tx(val)).tx().prepare();
+
+ return TRUE;
+
case OP_COMMIT:
tx(val).commit();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 08352b3..2764848 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -63,6 +63,7 @@
<Reference Include="System.Core" />
<Reference Include="System.Runtime.Serialization" />
<Reference Include="System.ServiceProcess" />
+ <Reference Include="System.Transactions" />
<Reference Include="System.XML" />
<Reference Include="System.Xml.Linq" />
</ItemGroup>
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
index 1239794..1bc0f02 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs
@@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Tests.Cache
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+ using System.Transactions;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Expiry;
@@ -2646,6 +2647,26 @@ namespace Apache.Ignite.Core.Tests.Cache
Assert.AreEqual(5, cache[1]);
}
+ /// <summary>
+ /// Tests that operations in TransactionScope work correctly in any cache mode (tx or non-tx).
+ /// </summary>
+ [Test]
+ public void TestTransactionScope()
+ {
+ var cache = Cache();
+
+ cache[1] = 1;
+
+ using (var ts = new TransactionScope())
+ {
+ cache[1] = 2;
+
+ ts.Complete();
+ }
+
+ Assert.AreEqual(2, cache[1]);
+ }
+
private void TestKeepBinaryFlag(bool async)
{
var cache0 = async ? Cache().WrapAsync() : Cache();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
index 5dcc560..37a22ae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTransactionalTest.cs
@@ -18,10 +18,13 @@
namespace Apache.Ignite.Core.Tests.Cache
{
using System;
+ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+ using System.Transactions;
using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Transactions;
using NUnit.Framework;
@@ -561,5 +564,326 @@ namespace Apache.Ignite.Core.Tests.Cache
var deadlockEx = aex.InnerExceptions.OfType<TransactionDeadlockException>().First();
Assert.IsTrue(deadlockEx.Message.Trim().StartsWith("Deadlock detected:"), deadlockEx.Message);
}
+
+ /// <summary>
+ /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeSingleCache()
+ {
+ var cache = Cache();
+
+ cache[1] = 1;
+ cache[2] = 2;
+
+ // Commit.
+ using (var ts = new TransactionScope())
+ {
+ cache[1] = 10;
+ cache[2] = 20;
+
+ Assert.IsNotNull(cache.Ignite.GetTransactions().Tx);
+
+ ts.Complete();
+ }
+
+ Assert.AreEqual(10, cache[1]);
+ Assert.AreEqual(20, cache[2]);
+
+ // Rollback.
+ using (new TransactionScope())
+ {
+ cache[1] = 100;
+ cache[2] = 200;
+ }
+
+ Assert.AreEqual(10, cache[1]);
+ Assert.AreEqual(20, cache[2]);
+ }
+
+ /// <summary>
+ /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>
+ /// with multiple participating caches.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeMultiCache()
+ {
+ var cache1 = Cache();
+
+ var cache2 = GetIgnite(0).GetOrCreateCache<int, int>(new CacheConfiguration(cache1.Name + "_")
+ {
+ AtomicityMode = CacheAtomicityMode.Transactional
+ });
+
+ cache1[1] = 1;
+ cache2[1] = 2;
+
+ // Commit.
+ using (var ts = new TransactionScope())
+ {
+ cache1[1] = 10;
+ cache2[1] = 20;
+
+ ts.Complete();
+ }
+
+ Assert.AreEqual(10, cache1[1]);
+ Assert.AreEqual(20, cache2[1]);
+
+ // Rollback.
+ using (new TransactionScope())
+ {
+ cache1[1] = 100;
+ cache2[1] = 200;
+ }
+
+ Assert.AreEqual(10, cache1[1]);
+ Assert.AreEqual(20, cache2[1]);
+ }
+
+ /// <summary>
+ /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>
+ /// when Ignite tx is started manually.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeWithManualIgniteTx()
+ {
+ var cache = Cache();
+ var transactions = cache.Ignite.GetTransactions();
+
+ cache[1] = 1;
+
+ // When Ignite tx is started manually, it won't be enlisted in TransactionScope.
+ using (var tx = transactions.TxStart())
+ {
+ using (new TransactionScope())
+ {
+ cache[1] = 2;
+ } // Revert transaction scope.
+
+ tx.Commit(); // Commit manual tx.
+ }
+
+ Assert.AreEqual(2, cache[1]);
+ }
+
+ /// <summary>
+ /// Test Ignite transaction with <see cref="TransactionScopeOption.Suppress"/> option.
+ /// </summary>
+ [Test]
+ public void TestSuppressedTransactionScope()
+ {
+ var cache = Cache();
+
+ cache[1] = 1;
+
+ using (new TransactionScope(TransactionScopeOption.Suppress))
+ {
+ cache[1] = 2;
+ }
+
+ // Even though transaction is not completed, the value is updated, because tx is suppressed.
+ Assert.AreEqual(2, cache[1]);
+ }
+
+ /// <summary>
+ /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/> with nested scopes.
+ /// </summary>
+ [Test]
+ public void TestNestedTransactionScope()
+ {
+ var cache = Cache();
+
+ cache[1] = 1;
+
+ foreach (var option in new[] {TransactionScopeOption.Required, TransactionScopeOption.RequiresNew})
+ {
+ // Commit.
+ using (var ts1 = new TransactionScope())
+ {
+ using (var ts2 = new TransactionScope(option))
+ {
+ cache[1] = 2;
+ ts2.Complete();
+ }
+
+ cache[1] = 3;
+ ts1.Complete();
+ }
+
+ Assert.AreEqual(3, cache[1]);
+
+ // Rollback.
+ using (new TransactionScope())
+ {
+ using (new TransactionScope(option))
+ cache[1] = 4;
+
+ cache[1] = 5;
+ }
+
+ // In case with Required option there is a single tx
+ // that gets aborted, second put executes outside the tx.
+ Assert.AreEqual(option == TransactionScopeOption.Required ? 5 : 3, cache[1], option.ToString());
+ }
+ }
+
+ /// <summary>
+ /// Test that ambient <see cref="TransactionScope"/> options propagate to Ignite transaction.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeOptions()
+ {
+ var cache = Cache();
+ var transactions = cache.Ignite.GetTransactions();
+
+ var modes = new[]
+ {
+ Tuple.Create(IsolationLevel.Serializable, TransactionIsolation.Serializable),
+ Tuple.Create(IsolationLevel.RepeatableRead, TransactionIsolation.RepeatableRead),
+ Tuple.Create(IsolationLevel.ReadCommitted, TransactionIsolation.ReadCommitted),
+ Tuple.Create(IsolationLevel.ReadUncommitted, TransactionIsolation.ReadCommitted),
+ Tuple.Create(IsolationLevel.Snapshot, TransactionIsolation.ReadCommitted),
+ Tuple.Create(IsolationLevel.Chaos, TransactionIsolation.ReadCommitted),
+ };
+
+ foreach (var mode in modes)
+ {
+ using (new TransactionScope(TransactionScopeOption.Required, new TransactionOptions
+ {
+ IsolationLevel = mode.Item1
+ }))
+ {
+ cache[1] = 1;
+
+ var tx = transactions.Tx;
+ Assert.AreEqual(mode.Item2, tx.Isolation);
+ Assert.AreEqual(transactions.DefaultTransactionConcurrency, tx.Concurrency);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Tests all transactional operations with <see cref="TransactionScope"/>.
+ /// </summary>
+ [Test]
+ public void TestTransactionScopeAllOperations()
+ {
+ CheckTxOp((cache, key) => cache.Put(key, -5));
+ CheckTxOp((cache, key) => cache.PutAsync(key, -5).Wait());
+
+ CheckTxOp((cache, key) => cache.PutAll(new Dictionary<int, int> {{key, -7}}));
+ CheckTxOp((cache, key) => cache.PutAllAsync(new Dictionary<int, int> {{key, -7}}).Wait());
+
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.PutIfAbsent(key, -10);
+ });
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.PutIfAbsentAsync(key, -10);
+ });
+
+ CheckTxOp((cache, key) => cache.GetAndPut(key, -9));
+ CheckTxOp((cache, key) => cache.GetAndPutAsync(key, -9).Wait());
+
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.GetAndPutIfAbsent(key, -10);
+ });
+ CheckTxOp((cache, key) =>
+ {
+ cache.Remove(key);
+ cache.GetAndPutIfAbsentAsync(key, -10).Wait();
+ });
+
+ CheckTxOp((cache, key) => cache.GetAndRemove(key));
+ CheckTxOp((cache, key) => cache.GetAndRemoveAsync(key));
+
+ CheckTxOp((cache, key) => cache.GetAndReplace(key, -11));
+ CheckTxOp((cache, key) => cache.GetAndReplaceAsync(key, -11));
+
+ CheckTxOp((cache, key) => cache.Invoke(key, new AddProcessor(), 1));
+ CheckTxOp((cache, key) => cache.InvokeAsync(key, new AddProcessor(), 1));
+
+ CheckTxOp((cache, key) => cache.InvokeAll(new[] {key}, new AddProcessor(), 1));
+ CheckTxOp((cache, key) => cache.InvokeAllAsync(new[] {key}, new AddProcessor(), 1));
+
+ CheckTxOp((cache, key) => cache.Remove(key));
+ CheckTxOp((cache, key) => cache.RemoveAsync(key));
+
+ CheckTxOp((cache, key) => cache.RemoveAll(new[] { key }));
+ CheckTxOp((cache, key) => cache.RemoveAllAsync(new[] { key }).Wait());
+
+ CheckTxOp((cache, key) => cache.Replace(key, 100));
+ CheckTxOp((cache, key) => cache.ReplaceAsync(key, 100));
+
+ CheckTxOp((cache, key) => cache.Replace(key, cache[key], 100));
+ CheckTxOp((cache, key) => cache.ReplaceAsync(key, cache[key], 100));
+ }
+
+ /// <summary>
+ /// Checks that cache operation behaves transactionally.
+ /// </summary>
+ private void CheckTxOp(Action<ICache<int, int>, int> act)
+ {
+ var cache = Cache();
+
+ cache[1] = 1;
+ cache[2] = 2;
+
+ // Rollback.
+ using (new TransactionScope())
+ {
+ act(cache, 1);
+
+ Assert.IsNotNull(cache.Ignite.GetTransactions().Tx, "Transaction has not started.");
+ }
+
+ Assert.AreEqual(1, cache[1]);
+ Assert.AreEqual(2, cache[2]);
+
+ using (new TransactionScope())
+ {
+ act(cache, 1);
+ act(cache, 2);
+ }
+
+ Assert.AreEqual(1, cache[1]);
+ Assert.AreEqual(2, cache[2]);
+
+ // Commit.
+ using (var ts = new TransactionScope())
+ {
+ act(cache, 1);
+ ts.Complete();
+ }
+
+ Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
+ Assert.AreEqual(2, cache[2]);
+
+ using (var ts = new TransactionScope())
+ {
+ act(cache, 1);
+ act(cache, 2);
+ ts.Complete();
+ }
+
+ Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
+ Assert.IsTrue(!cache.ContainsKey(2) || cache[2] != 2);
+ }
+
+ [Serializable]
+ private class AddProcessor : ICacheEntryProcessor<int, int, int, int>
+ {
+ public int Process(IMutableCacheEntry<int, int> entry, int arg)
+ {
+ entry.Value += arg;
+ return arg;
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index d58c872..673449e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -87,6 +87,7 @@
<Reference Include="System" />
<Reference Include="System.configuration" />
<Reference Include="System.Core" />
+ <Reference Include="System.Transactions" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
@@ -189,6 +190,7 @@
<Compile Include="Impl\Binary\SerializableSerializer.cs" />
<Compile Include="Impl\Binary\BinaryWriterExtensions.cs" />
<Compile Include="Impl\Cache\Affinity\AffinityFunctionBase.cs" />
+ <Compile Include="Impl\Transactions\CacheTransactionManager.cs" />
<Compile Include="Impl\Cache\Expiry\ExpiryPolicyFactory.cs" />
<Compile Include="Impl\Cache\Expiry\ExpiryPolicySerializer.cs" />
<Compile Include="Impl\Cache\ICacheLockInternal.cs" />
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
index 2a0ec86..50938e1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/ICache.cs
@@ -656,14 +656,14 @@ namespace Apache.Ignite.Core.Cache
/// <summary>
/// Removes all mappings from cache.
/// If write-through is enabled, the value will be removed from store.
- /// This method is transactional and will enlist the entry into ongoing transaction if there is one.
+ /// This method is not transactional.
/// </summary>
void RemoveAll();
/// <summary>
/// Removes all mappings from cache.
/// If write-through is enabled, the value will be removed from store.
- /// This method is transactional and will enlist the entry into ongoing transaction if there is one.
+ /// This method is not transactional.
/// </summary>
Task RemoveAllAsync();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index b8dc6cb..a387e1b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Impl.Cache
using System;
using System.Collections;
using System.Collections.Generic;
+ using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
@@ -36,6 +37,7 @@ namespace Apache.Ignite.Core.Impl.Cache
using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
using Apache.Ignite.Core.Impl.Cluster;
using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Transactions;
using Apache.Ignite.Core.Impl.Unmanaged;
/// <summary>
@@ -56,6 +58,9 @@ namespace Apache.Ignite.Core.Impl.Cache
/** Flag: no-retries.*/
private readonly bool _flagNoRetries;
+ /** Transaction manager. */
+ private readonly CacheTransactionManager _txManager;
+
/// <summary>
/// Constructor.
/// </summary>
@@ -68,10 +73,16 @@ namespace Apache.Ignite.Core.Impl.Cache
public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh,
bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries) : base(target, marsh)
{
+ Debug.Assert(grid != null);
+
_ignite = grid;
_flagSkipStore = flagSkipStore;
_flagKeepBinary = flagKeepBinary;
_flagNoRetries = flagNoRetries;
+
+ _txManager = GetConfiguration().AtomicityMode == CacheAtomicityMode.Transactional
+ ? new CacheTransactionManager(grid.GetTransactions())
+ : null;
}
/** <inheritDoc /> */
@@ -416,9 +427,10 @@ namespace Apache.Ignite.Core.Impl.Cache
public void Put(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
-
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
DoOutOp(CacheOp.Put, key, val);
}
@@ -428,6 +440,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOpAsync(CacheOp.PutAsync, key, val);
}
@@ -437,6 +451,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutInOpNullable(CacheOp.GetAndPut, key, val);
}
@@ -446,6 +462,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOpAsync(CacheOp.GetAndPutAsync, w =>
{
w.WriteObject(key);
@@ -457,9 +475,10 @@ namespace Apache.Ignite.Core.Impl.Cache
public CacheResult<TV> GetAndReplace(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
-
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutInOpNullable(CacheOp.GetAndReplace, key, val);
}
@@ -469,6 +488,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOpAsync(CacheOp.GetAndReplaceAsync, w =>
{
w.WriteObject(key);
@@ -481,6 +502,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ StartTx();
+
return DoOutInOpNullable(CacheOp.GetAndRemove, key);
}
@@ -489,6 +512,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ StartTx();
+
return DoOutOpAsync(CacheOp.GetAndRemoveAsync, w => w.WriteObject(key), r => GetCacheResult(r));
}
@@ -496,9 +521,10 @@ namespace Apache.Ignite.Core.Impl.Cache
public bool PutIfAbsent(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
-
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOp(CacheOp.PutIfAbsent, key, val);
}
@@ -508,6 +534,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOpAsync<TK, TV, bool>(CacheOp.PutIfAbsentAsync, key, val);
}
@@ -515,9 +543,10 @@ namespace Apache.Ignite.Core.Impl.Cache
public CacheResult<TV> GetAndPutIfAbsent(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
-
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val);
}
@@ -527,6 +556,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOpAsync(CacheOp.GetAndPutIfAbsentAsync, w =>
{
w.WriteObject(key);
@@ -538,9 +569,10 @@ namespace Apache.Ignite.Core.Impl.Cache
public bool Replace(TK key, TV val)
{
IgniteArgumentCheck.NotNull(key, "key");
-
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOp(CacheOp.Replace2, key, val);
}
@@ -550,6 +582,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOpAsync<TK, TV, bool>(CacheOp.Replace2Async, key, val);
}
@@ -557,11 +591,11 @@ namespace Apache.Ignite.Core.Impl.Cache
public bool Replace(TK key, TV oldVal, TV newVal)
{
IgniteArgumentCheck.NotNull(key, "key");
-
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
-
IgniteArgumentCheck.NotNull(newVal, "newVal");
+ StartTx();
+
return DoOutOp(CacheOp.Replace3, key, oldVal, newVal);
}
@@ -572,6 +606,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
+ StartTx();
+
return DoOutOpAsync<bool>(CacheOp.Replace3Async, w =>
{
w.WriteObject(key);
@@ -585,6 +621,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
+ StartTx();
+
DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals));
}
@@ -593,6 +631,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
+ StartTx();
+
return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer, vals));
}
@@ -669,6 +709,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ StartTx();
+
return DoOutOp(CacheOp.RemoveObj, key);
}
@@ -677,6 +719,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
+ StartTx();
+
return DoOutOpAsync<TK, bool>(CacheOp.RemoveObjAsync, key);
}
@@ -686,6 +730,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOp(CacheOp.RemoveBool, key, val);
}
@@ -695,6 +741,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
+ StartTx();
+
return DoOutOpAsync<TK, TV, bool>(CacheOp.RemoveBoolAsync, key, val);
}
@@ -703,6 +751,8 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
+ StartTx();
+
DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys));
}
@@ -711,18 +761,24 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
+ StartTx();
+
return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer, keys));
}
/** <inheritDoc /> */
public void RemoveAll()
{
+ StartTx();
+
DoOutInOp((int) CacheOp.RemoveAll2);
}
/** <inheritDoc /> */
public Task RemoveAllAsync()
{
+ StartTx();
+
return DoOutOpAsync(CacheOp.RemoveAll2Async);
}
@@ -773,9 +829,10 @@ namespace Apache.Ignite.Core.Impl.Cache
public TRes Invoke<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
IgniteArgumentCheck.NotNull(key, "key");
-
IgniteArgumentCheck.NotNull(processor, "processor");
+ StartTx();
+
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
@@ -795,6 +852,8 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(processor, "processor");
+ StartTx();
+
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
@@ -822,9 +881,10 @@ namespace Apache.Ignite.Core.Impl.Cache
ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
IgniteArgumentCheck.NotNull(keys, "keys");
-
IgniteArgumentCheck.NotNull(processor, "processor");
+ StartTx();
+
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
@@ -842,9 +902,10 @@ namespace Apache.Ignite.Core.Impl.Cache
ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
{
IgniteArgumentCheck.NotNull(keys, "keys");
-
IgniteArgumentCheck.NotNull(processor, "processor");
+ StartTx();
+
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
@@ -1308,5 +1369,14 @@ namespace Apache.Ignite.Core.Impl.Cache
{
DoOutInOp((int) CacheOp.CloseLock, id);
}
+
+ /// <summary>
+ /// Starts a transaction when applicable.
+ /// </summary>
+ private void StartTx()
+ {
+ if (_txManager != null)
+ _txManager.StartTx();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
new file mode 100644
index 0000000..f5a1617
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Transactions
+{
+ using System;
+ using System.Diagnostics;
+ using System.Threading;
+ using System.Transactions;
+ using Apache.Ignite.Core.Transactions;
+
+ /// <summary>
+ /// Cache transaction enlistment manager,
+ /// allows using Ignite transactions via standard <see cref="TransactionScope"/>.
+ /// </summary>
+ internal class CacheTransactionManager : IEnlistmentNotification
+ {
+ /** */
+ private readonly ITransactions _transactions;
+
+ /** */
+ private static readonly ThreadLocal<Enlistment> Enlistment = new ThreadLocal<Enlistment>();
+
+ /// <summary>
+ /// Initializes a new instance of <see cref="CacheTransactionManager"/> class.
+ /// </summary>
+ /// <param name="transactions">Transactions.</param>
+ public CacheTransactionManager(ITransactions transactions)
+ {
+ Debug.Assert(transactions != null);
+
+ _transactions = transactions;
+ }
+
+ /// <summary>
+ /// If ambient transaction is present, starts an Ignite transaction and enlists it.
+ /// </summary>
+ public void StartTx()
+ {
+ if (_transactions.Tx != null)
+ {
+ // Ignite transaction is already present.
+ // We have either enlisted it already, or it has been started manually and should not be enlisted.
+ // Java enlists existing Ignite tx in this case (see CacheJtaManager.java), but we do not.
+ return;
+ }
+
+ if (Enlistment.Value != null)
+ {
+ // We are already enlisted.
+ // .NET transaction mechanism allows nested transactions,
+ // and they can be processed differently depending on TransactionScopeOption.
+ // Ignite, however, allows only one active transaction per thread.
+ // Therefore we enlist only once on the first transaction that we encounter.
+ return;
+ }
+
+ var ambientTx = System.Transactions.Transaction.Current;
+
+ if (ambientTx != null && ambientTx.TransactionInformation.Status == TransactionStatus.Active)
+ {
+ _transactions.TxStart(_transactions.DefaultTransactionConcurrency,
+ ConvertTransactionIsolation(ambientTx.IsolationLevel));
+
+ Enlistment.Value = ambientTx.EnlistVolatile(this, EnlistmentOptions.None);
+ }
+ }
+
+ /** <inheritdoc /> */
+ void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)
+ {
+ var igniteTx = _transactions.Tx;
+
+ if (igniteTx != null && Enlistment.Value != null)
+ {
+ ((Transaction) igniteTx).Prepare();
+ }
+
+ preparingEnlistment.Prepared();
+ }
+
+ /** <inheritdoc /> */
+ void IEnlistmentNotification.Commit(Enlistment enlistment)
+ {
+ var igniteTx = _transactions.Tx;
+
+ if (igniteTx != null && Enlistment.Value != null)
+ {
+ Debug.Assert(ReferenceEquals(enlistment, Enlistment.Value));
+
+ igniteTx.Commit();
+
+ igniteTx.Dispose();
+
+ Enlistment.Value = null;
+ }
+
+ enlistment.Done();
+ }
+
+ /** <inheritdoc /> */
+ void IEnlistmentNotification.Rollback(Enlistment enlistment)
+ {
+ var igniteTx = _transactions.Tx;
+
+ if (igniteTx != null && Enlistment.Value != null)
+ {
+ igniteTx.Rollback();
+
+ igniteTx.Dispose();
+
+ Enlistment.Value = null;
+ }
+
+ enlistment.Done();
+ }
+
+ /** <inheritdoc /> */
+ void IEnlistmentNotification.InDoubt(Enlistment enlistment)
+ {
+ enlistment.Done();
+ }
+
+ /// <summary>
+ /// Converts the isolation level from .NET-specific to Ignite-specific.
+ /// </summary>
+ private static TransactionIsolation ConvertTransactionIsolation(IsolationLevel isolation)
+ {
+ switch (isolation)
+ {
+ case IsolationLevel.Serializable:
+ return TransactionIsolation.Serializable;
+ case IsolationLevel.RepeatableRead:
+ return TransactionIsolation.RepeatableRead;
+ case IsolationLevel.ReadCommitted:
+ case IsolationLevel.ReadUncommitted:
+ case IsolationLevel.Snapshot:
+ case IsolationLevel.Chaos:
+ return TransactionIsolation.ReadCommitted;
+ default:
+ throw new ArgumentOutOfRangeException("isolation", isolation,
+ "Unsupported transaction isolation level: " + isolation);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
index 595300c..f700bfd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/Transaction.cs
@@ -142,5 +142,13 @@ namespace Apache.Ignite.Core.Impl.Transactions
{
return _tx.RemoveMeta<TV>(name);
}
+
+ /// <summary>
+ /// Executes prepare step of the two phase commit.
+ /// </summary>
+ public void Prepare()
+ {
+ _tx.Prepare();
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
index d32cd3d..0b04a68 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionImpl.cs
@@ -112,6 +112,19 @@ namespace Apache.Ignite.Core.Impl.Transactions
}
/// <summary>
+ /// Executes prepare step of the two phase commit.
+ /// </summary>
+ public void Prepare()
+ {
+ lock (this)
+ {
+ ThrowIfClosed();
+
+ _txs.TxPrepare(this);
+ }
+ }
+
+ /// <summary>
/// Commits this tx and closes it.
/// </summary>
public void Commit()
http://git-wip-us.apache.org/repos/asf/ignite/blob/f5375dee/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
index 5fa5db8..ff5d6a2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/TransactionsImpl.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Transactions
using System.Threading.Tasks;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Unmanaged;
using Apache.Ignite.Core.Transactions;
@@ -63,6 +64,8 @@ namespace Apache.Ignite.Core.Impl.Transactions
/** */
private const int OpResetMetrics = 11;
+ /** */
+ private const int OpPrepare = 12;
/** */
private readonly TransactionConcurrency _dfltConcurrency;
@@ -177,6 +180,14 @@ namespace Apache.Ignite.Core.Impl.Transactions
}
/// <summary>
+ /// Executes prepare step of the two phase commit.
+ /// </summary>
+ internal void TxPrepare(TransactionImpl tx)
+ {
+ DoOutInOp(OpPrepare, tx.Id);
+ }
+
+ /// <summary>
/// Commit transaction.
/// </summary>
/// <param name="tx">Transaction.</param>