You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2019/08/30 16:16:09 UTC
[ignite] branch master updated: GG-21615 .NET: Fix
StreamTransformer ignoring duplicate keys
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1072bac GG-21615 .NET: Fix StreamTransformer ignoring duplicate keys
1072bac is described below
commit 1072bacd249ccafedf15bddbcca0e4fdd3da805c
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Aug 30 19:15:53 2019 +0300
GG-21615 .NET: Fix StreamTransformer ignoring duplicate keys
* Root cause: `StreamTransformer` used `InvokeAll`, while in Java it is `Invoke` in a loop, causing different semantics.
* Related issue: local processor allocation is not used, causing performance degradation for `Invoke` calls on local cache entries (which is exactly what's going on with `StreamTransformer`)
---
.../processors/platform/cache/PlatformCache.java | 16 +-
.../cpp/core/include/ignite/cache/cache.h | 2 +-
.../cpp/core/include/ignite/impl/operations.h | 34 ++++
.../Dataload/DataStreamerTest.cs | 30 +++
.../Deployment/PeerAssemblyLoadingTest.cs | 2 +
.../Datastream/StreamTransformer.cs | 9 +-
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 223 ++++++++++++++-------
.../Impl/Transactions/CacheTransactionManager.cs | 7 +-
.../Impl/Unmanaged/UnmanagedCallbacks.cs | 2 +-
.../platforms/dotnet/Apache.Ignite.sln.DotSettings | 1 +
10 files changed, 234 insertions(+), 92 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 23883e40..fbab71b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -720,7 +720,9 @@ public class PlatformCache extends PlatformAbstractTarget {
case OP_INVOKE_ASYNC: {
Object key = reader.readObjectDetached();
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+ long ptr = reader.readLong();
+
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
readAndListenFuture(reader, cache.invokeAsync(key, proc), WRITER_INVOKE);
@@ -730,7 +732,9 @@ public class PlatformCache extends PlatformAbstractTarget {
case OP_INVOKE_ALL_ASYNC: {
Set<Object> keys = PlatformUtils.readSet(reader);
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+ long ptr = reader.readLong();
+
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
readAndListenFuture(reader, cache.invokeAllAsync(keys, proc), WRITER_INVOKE_ALL);
@@ -745,16 +749,16 @@ public class PlatformCache extends PlatformAbstractTarget {
case OP_INVOKE: {
Object key = reader.readObjectDetached();
-
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+ long ptr = reader.readLong();
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
return writeResult(mem, cache.invoke(key, proc));
}
case OP_INVOKE_ALL: {
Set<Object> keys = PlatformUtils.readSet(reader);
-
- CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+ long ptr = reader.readLong();
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
Map results = cache.invokeAll(keys, proc);
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h
index b6afdea..e18654d 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h
@@ -1624,7 +1624,7 @@ namespace ignite
R res;
ProcessorHolder procHolder(processor, arg);
- impl::In2Operation<K, ProcessorHolder> inOp(key, procHolder);
+ impl::InCacheInvokeOperation<K, ProcessorHolder> inOp(key, procHolder);
impl::Out1Operation<R> outOp(res);
impl.Get()->Invoke(inOp, outOp, err);
diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h
index 9f816bf..39e3db5 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/operations.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h
@@ -254,6 +254,40 @@ namespace ignite
};
/**
+ * Cache Invoke input operation.
+ */
+ template<typename T1, typename T2>
+ class InCacheInvokeOperation : public InputOperation
+ {
+ public:
+ /**
+ * Constructor.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ */
+ InCacheInvokeOperation(const T1& val1, const T2& val2) : val1(val1), val2(val2)
+ {
+ // No-op.
+ }
+
+ virtual void ProcessInput(ignite::impl::binary::BinaryWriterImpl& writer)
+ {
+ writer.WriteTopObject<T1>(val1);
+ writer.WriteInt64(0);
+ writer.WriteTopObject<T2>(val2);
+ }
+ private:
+ /** First value. */
+ const T1& val1;
+
+ /** Second value. */
+ const T2& val2;
+
+ IGNITE_NO_COPY_ASSIGNMENT(InCacheInvokeOperation)
+ };
+
+ /**
* Input iterator operation.
*/
template<typename K, typename V, typename Iter>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
index 3e69d20..708f2c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
@@ -503,6 +503,27 @@ namespace Apache.Ignite.Core.Tests.Dataload
TestStreamReceiver(new StreamTransformer<int, int, int, int>(new EntryProcessorBinarizable()));
}
+ [Test]
+ public void TestStreamTransformerIsInvokedForDuplicateKeys()
+ {
+ var cache = _grid.GetOrCreateCache<string, long>("c");
+
+ using (var streamer = _grid.GetDataStreamer<string, long>(cache.Name))
+ {
+ streamer.AllowOverwrite = true;
+ streamer.Receiver = new StreamTransformer<string, long, object, object>(new CountingEntryProcessor());
+
+ var words = Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
+ foreach (var word in words)
+ {
+ streamer.AddData(word, 1L);
+ }
+ }
+
+ Assert.AreEqual(3, cache.Get("a"));
+ Assert.AreEqual(2, cache.Get("b"));
+ }
+
/// <summary>
/// Tests specified receiver.
/// </summary>
@@ -665,5 +686,14 @@ namespace Apache.Ignite.Core.Tests.Dataload
public Container Inner;
}
+ private class CountingEntryProcessor : ICacheEntryProcessor<string, long, object, object>
+ {
+ public object Process(IMutableCacheEntry<string, long> e, object arg)
+ {
+ e.Value++;
+
+ return null;
+ }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs
index 23be6dc..8933956 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs
@@ -19,8 +19,10 @@ namespace Apache.Ignite.Core.Tests.Deployment
{
extern alias ExamplesDll;
using System;
+ using System.Diagnostics;
using System.IO;
using System.Threading;
+ using System.Threading.Tasks;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Compute;
using Apache.Ignite.Core.Deployment;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
index 7413aee..366b158 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
@@ -58,12 +58,9 @@ namespace Apache.Ignite.Core.Datastream
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
public void Receive(ICache<TK, TV> cache, ICollection<ICacheEntry<TK, TV>> entries)
{
- var keys = new List<TK>(entries.Count);
-
+ // Don't use InvokeAll because semantics is different, e.g. duplicate keys are ignored.
foreach (var entry in entries)
- keys.Add(entry.Key);
-
- cache.InvokeAll(keys, _proc, default(TArg));
+ cache.Invoke(entry.Key, _proc, default(TArg));
}
/** <inheritdoc /> */
@@ -76,4 +73,4 @@ namespace Apache.Ignite.Core.Datastream
w.WriteObject(_proc);
}
}
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 97b31be..56ce51f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -481,7 +481,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
DoOutOp(CacheOp.Put, key, val);
}
@@ -492,7 +492,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.PutAsync, key, val);
}
@@ -503,7 +503,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutInOpNullable(CacheOp.GetAndPut, key, val);
}
@@ -514,7 +514,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.GetAndPutAsync, w =>
{
@@ -529,7 +529,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutInOpNullable(CacheOp.GetAndReplace, key, val);
}
@@ -540,7 +540,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.GetAndReplaceAsync, w =>
{
@@ -554,7 +554,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- StartTx();
+ StartTxIfNeeded();
return DoOutInOpNullable(CacheOp.GetAndRemove, key);
}
@@ -564,7 +564,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.GetAndRemoveAsync, w => w.WriteObject(key), r => GetCacheResult(r));
}
@@ -575,7 +575,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOp(CacheOp.PutIfAbsent, key, val);
}
@@ -586,7 +586,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync<TK, TV, bool>(CacheOp.PutIfAbsentAsync, key, val);
}
@@ -597,7 +597,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val);
}
@@ -608,7 +608,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.GetAndPutIfAbsentAsync, w =>
{
@@ -623,7 +623,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOp(CacheOp.Replace2, key, val);
}
@@ -634,7 +634,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync<TK, TV, bool>(CacheOp.Replace2Async, key, val);
}
@@ -646,7 +646,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
- StartTx();
+ StartTxIfNeeded();
return DoOutOp(CacheOp.Replace3, key, oldVal, newVal);
}
@@ -658,7 +658,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync<bool>(CacheOp.Replace3Async, w =>
{
@@ -673,7 +673,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
- StartTx();
+ StartTxIfNeeded();
DoOutOp(CacheOp.PutAll, writer => writer.WriteDictionary(vals));
}
@@ -683,7 +683,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(vals, "vals");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.PutAllAsync, writer => writer.WriteDictionary(vals));
}
@@ -761,7 +761,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- StartTx();
+ StartTxIfNeeded();
return DoOutOp(CacheOp.RemoveObj, key);
}
@@ -771,7 +771,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync<TK, bool>(CacheOp.RemoveObjAsync, key);
}
@@ -782,7 +782,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOp(CacheOp.RemoveBool, key, val);
}
@@ -793,7 +793,7 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync<TK, TV, bool>(CacheOp.RemoveBoolAsync, key, val);
}
@@ -803,7 +803,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- StartTx();
+ StartTxIfNeeded();
DoOutOp(CacheOp.RemoveAll, writer => writer.WriteEnumerable(keys));
}
@@ -813,7 +813,7 @@ namespace Apache.Ignite.Core.Impl.Cache
{
IgniteArgumentCheck.NotNull(keys, "keys");
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => writer.WriteEnumerable(keys));
}
@@ -821,7 +821,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/** <inheritDoc /> */
public void RemoveAll()
{
- StartTx();
+ StartTxIfNeeded();
DoOutInOp((int) CacheOp.RemoveAll2);
}
@@ -829,7 +829,7 @@ namespace Apache.Ignite.Core.Impl.Cache
/** <inheritDoc /> */
public Task RemoveAllAsync()
{
- StartTx();
+ StartTxIfNeeded();
return DoOutOpAsync(CacheOp.RemoveAll2Async);
}
@@ -976,19 +976,30 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(processor, "processor");
- StartTx();
+ StartTxIfNeeded();
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
- return DoOutInOpX((int) CacheOp.Invoke,
- writer =>
- {
- writer.WriteObjectDetached(key);
- writer.WriteObjectDetached(holder);
- },
- (input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes),
- _readException);
+ var ptr = AllocateIfNoTx(holder);
+
+ try
+ {
+ return DoOutInOpX((int) CacheOp.Invoke,
+ writer =>
+ {
+ writer.WriteObjectDetached(key);
+ writer.WriteLong(ptr);
+ writer.WriteObjectDetached(holder);
+ },
+ (input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes),
+ _readException);
+ }
+ finally
+ {
+ if (ptr != 0)
+ _ignite.HandleRegistry.Release(ptr);
+ }
}
/** <inheritDoc /> */
@@ -997,28 +1008,44 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(processor, "processor");
- StartTx();
+ StartTxIfNeeded();
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
- return DoOutOpAsync(CacheOp.InvokeAsync, writer =>
- {
- writer.WriteObjectDetached(key);
- writer.WriteObjectDetached(holder);
- },
- r =>
- {
- if (r == null)
- return default(TRes);
+ var ptr = AllocateIfNoTx(holder);
+
+ try
+ {
+ return DoOutOpAsync(CacheOp.InvokeAsync, writer =>
+ {
+ writer.WriteObjectDetached(key);
+ writer.WriteLong(ptr);
+ writer.WriteObjectDetached(holder);
+ },
+ reader =>
+ {
+ if (ptr != 0)
+ _ignite.HandleRegistry.Release(ptr);
- var hasError = r.ReadBoolean();
+ if (reader == null)
+ return default(TRes);
- if (hasError)
- throw ReadException(r);
+ var hasError = reader.ReadBoolean();
- return r.ReadObject<TRes>();
- });
+ if (hasError)
+ throw ReadException(reader);
+
+ return reader.ReadObject<TRes>();
+ });
+ }
+ catch (Exception)
+ {
+ if (ptr != 0)
+ _ignite.HandleRegistry.Release(ptr);
+
+ throw;
+ }
}
/** <inheritdoc /> */
@@ -1028,20 +1055,31 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(keys, "keys");
IgniteArgumentCheck.NotNull(processor, "processor");
- StartTx();
+ StartTxIfNeeded();
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
- return DoOutInOpX((int) CacheOp.InvokeAll,
- writer =>
- {
- writer.WriteEnumerable(keys);
- writer.Write(holder);
- },
- (input, res) => res == True
- ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary))
- : null, _readException);
+ var ptr = AllocateIfNoTx(holder);
+
+ try
+ {
+ return DoOutInOpX((int) CacheOp.InvokeAll,
+ writer =>
+ {
+ writer.WriteEnumerable(keys);
+ writer.WriteLong(ptr);
+ writer.Write(holder);
+ },
+ (input, res) => res == True
+ ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary))
+ : null, _readException);
+ }
+ finally
+ {
+ if (ptr != 0)
+ _ignite.HandleRegistry.Release(ptr);
+ }
}
/** <inheritDoc /> */
@@ -1051,19 +1089,37 @@ namespace Apache.Ignite.Core.Impl.Cache
IgniteArgumentCheck.NotNull(keys, "keys");
IgniteArgumentCheck.NotNull(processor, "processor");
- StartTx();
+ StartTxIfNeeded();
var holder = new CacheEntryProcessorHolder(processor, arg,
(e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
- return DoOutOpAsync(CacheOp.InvokeAllAsync,
- writer =>
- {
- writer.WriteEnumerable(keys);
- writer.Write(holder);
- },
- input => ReadInvokeAllResults<TRes>(input));
+ var ptr = AllocateIfNoTx(holder);
+
+ try
+ {
+ return DoOutOpAsync(CacheOp.InvokeAllAsync,
+ writer =>
+ {
+ writer.WriteEnumerable(keys);
+ writer.WriteLong(ptr);
+ writer.Write(holder);
+ },
+ reader =>
+ {
+ if (ptr != 0)
+ _ignite.HandleRegistry.Release(ptr);
+
+ return ReadInvokeAllResults<TRes>(reader);
+ });
+ }
+ catch (Exception)
+ {
+ if (ptr != 0)
+ _ignite.HandleRegistry.Release(ptr);
+ throw;
+ }
}
/** <inheritDoc /> */
@@ -1544,15 +1600,6 @@ namespace Apache.Ignite.Core.Impl.Cache
DoOutInOp((int) CacheOp.CloseLock, id);
}
- /// <summary>
- /// Starts a transaction when applicable.
- /// </summary>
- private void StartTx()
- {
- if (_txManager != null)
- _txManager.StartTx();
- }
-
/** <inheritdoc /> */
public IQueryMetrics GetQueryMetrics()
{
@@ -1587,5 +1634,27 @@ namespace Apache.Ignite.Core.Impl.Cache
{
return DoOutOp(CacheOp.LocalPreloadPartition, w => w.WriteInt(partition));
}
+
+ /// <summary>
+ /// Starts a transaction when applicable.
+ /// </summary>
+ private void StartTxIfNeeded()
+ {
+ if (_txManager != null)
+ _txManager.StartTx();
+ }
+
+ /// <summary>
+ /// Allocates a handle only when there is no active transaction.
+ /// </summary>
+ /// <returns>Handle or 0 when there is an active transaction.</returns>
+ private long AllocateIfNoTx(object obj)
+ {
+ // With transactions, actual cache operation execution is delayed, so we don't control handle lifetime.
+ if (_txManager != null && _txManager.IsInTx())
+ return 0;
+
+ return _ignite.HandleRegistry.Allocate(obj);
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
index a2ef63c..9838689 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
@@ -82,6 +82,11 @@ namespace Apache.Ignite.Core.Impl.Transactions
}
}
+ public bool IsInTx()
+ {
+ return _transactions.Tx != null;
+ }
+
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)
@@ -171,4 +176,4 @@ namespace Apache.Ignite.Core.Impl.Transactions
}
}
}
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index 855d11d..a597fa0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -1339,4 +1339,4 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}
}
}
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
index 3f4e493..a45f1d2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
@@ -9,6 +9,7 @@
<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EXml_002ECodeStyle_002EFormatSettingsUpgrade_002EXmlMoveToCommonFormatterSettingsUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/UnitTesting/ShadowCopy/@EntryValue">False</s:Boolean>
+ <s:Boolean x:Key="/Default/UserDictionary/Words/=Dataload/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Failover/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=preloads/@EntryIndexedValue">True</s:Boolean>
</wpf:ResourceDictionary>
\ No newline at end of file