You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2019/08/30 16:16:09 UTC

[ignite] branch master updated: GG-21615 .NET: Fix StreamTransformer ignoring duplicate keys

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1072bac  GG-21615 .NET: Fix StreamTransformer ignoring duplicate keys
1072bac is described below

commit 1072bacd249ccafedf15bddbcca0e4fdd3da805c
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Aug 30 19:15:53 2019 +0300

    GG-21615 .NET: Fix StreamTransformer ignoring duplicate keys
    
    * Root cause: `StreamTransformer` used `InvokeAll`, while in Java it is `Invoke` in a loop, causing different semantics.
    * Related issue: local processor allocation is not used, causing performance degradation for `Invoke` calls on local cache entries (which is exactly what's going on with `StreamTransformer`)
---
 .../processors/platform/cache/PlatformCache.java   |  16 +-
 .../cpp/core/include/ignite/cache/cache.h          |   2 +-
 .../cpp/core/include/ignite/impl/operations.h      |  34 ++++
 .../Dataload/DataStreamerTest.cs                   |  30 +++
 .../Deployment/PeerAssemblyLoadingTest.cs          |   2 +
 .../Datastream/StreamTransformer.cs                |   9 +-
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs     | 223 ++++++++++++++-------
 .../Impl/Transactions/CacheTransactionManager.cs   |   7 +-
 .../Impl/Unmanaged/UnmanagedCallbacks.cs           |   2 +-
 .../platforms/dotnet/Apache.Ignite.sln.DotSettings |   1 +
 10 files changed, 234 insertions(+), 92 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 23883e40..fbab71b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -720,7 +720,9 @@ public class PlatformCache extends PlatformAbstractTarget {
                 case OP_INVOKE_ASYNC: {
                     Object key = reader.readObjectDetached();
 
-                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+                    long ptr = reader.readLong();
+
+                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
 
                     readAndListenFuture(reader, cache.invokeAsync(key, proc), WRITER_INVOKE);
 
@@ -730,7 +732,9 @@ public class PlatformCache extends PlatformAbstractTarget {
                 case OP_INVOKE_ALL_ASYNC: {
                     Set<Object> keys = PlatformUtils.readSet(reader);
 
-                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+                    long ptr = reader.readLong();
+
+                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
 
                     readAndListenFuture(reader, cache.invokeAllAsync(keys, proc), WRITER_INVOKE_ALL);
 
@@ -745,16 +749,16 @@ public class PlatformCache extends PlatformAbstractTarget {
 
                 case OP_INVOKE: {
                     Object key = reader.readObjectDetached();
-
-                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+                    long ptr = reader.readLong();
+                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
 
                     return writeResult(mem, cache.invoke(key, proc));
                 }
 
                 case OP_INVOKE_ALL: {
                     Set<Object> keys = PlatformUtils.readSet(reader);
-
-                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+                    long ptr = reader.readLong();
+                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), ptr);
 
                     Map results = cache.invokeAll(keys, proc);
 
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h
index b6afdea..e18654d 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h
@@ -1624,7 +1624,7 @@ namespace ignite
                 R res;
                 ProcessorHolder procHolder(processor, arg);
 
-                impl::In2Operation<K, ProcessorHolder> inOp(key, procHolder);
+                impl::InCacheInvokeOperation<K, ProcessorHolder> inOp(key, procHolder);
                 impl::Out1Operation<R> outOp(res);
 
                 impl.Get()->Invoke(inOp, outOp, err);
diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h
index 9f816bf..39e3db5 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/operations.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h
@@ -254,6 +254,40 @@ namespace ignite
         };
 
         /**
+         * Cache Invoke input operation.
+         */
+        template<typename T1, typename T2>
+        class InCacheInvokeOperation : public InputOperation
+        {
+        public:
+            /**
+             * Constructor.
+             *
+             * @param val1 First value.
+             * @param val2 Second value.
+             */
+            InCacheInvokeOperation(const T1& val1, const T2& val2) : val1(val1), val2(val2)
+            {
+                // No-op.
+            }
+
+            virtual void ProcessInput(ignite::impl::binary::BinaryWriterImpl& writer)
+            {
+                writer.WriteTopObject<T1>(val1);
+                writer.WriteInt64(0);
+                writer.WriteTopObject<T2>(val2);
+            }
+        private:
+            /** First value. */
+            const T1& val1;
+
+            /** Second value. */
+            const T2& val2;
+
+            IGNITE_NO_COPY_ASSIGNMENT(InCacheInvokeOperation)
+        };
+
+        /**
          * Input iterator operation.
          */
         template<typename K, typename V, typename Iter>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
index 3e69d20..708f2c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs
@@ -503,6 +503,27 @@ namespace Apache.Ignite.Core.Tests.Dataload
             TestStreamReceiver(new StreamTransformer<int, int, int, int>(new EntryProcessorBinarizable()));
         }
 
+        [Test]
+        public void TestStreamTransformerIsInvokedForDuplicateKeys()
+        {
+            var cache = _grid.GetOrCreateCache<string, long>("c");
+
+            using (var streamer = _grid.GetDataStreamer<string, long>(cache.Name))
+            {
+                streamer.AllowOverwrite = true;
+                streamer.Receiver = new StreamTransformer<string, long, object, object>(new CountingEntryProcessor());
+
+                var words = Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
+                foreach (var word in words)
+                {
+                    streamer.AddData(word, 1L);
+                }
+            }
+
+            Assert.AreEqual(3, cache.Get("a"));
+            Assert.AreEqual(2, cache.Get("b"));
+        }
+
         /// <summary>
         /// Tests specified receiver.
         /// </summary>
@@ -665,5 +686,14 @@ namespace Apache.Ignite.Core.Tests.Dataload
             public Container Inner;
         }
 
+        private class CountingEntryProcessor : ICacheEntryProcessor<string, long, object, object>
+        {
+            public object Process(IMutableCacheEntry<string, long> e, object arg)
+            {
+                e.Value++;
+
+                return null;
+            }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs
index 23be6dc..8933956 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Deployment/PeerAssemblyLoadingTest.cs
@@ -19,8 +19,10 @@ namespace Apache.Ignite.Core.Tests.Deployment
 {
     extern alias ExamplesDll;
     using System;
+    using System.Diagnostics;
     using System.IO;
     using System.Threading;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Compute;
     using Apache.Ignite.Core.Deployment;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
index 7413aee..366b158 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/StreamTransformer.cs
@@ -58,12 +58,9 @@ namespace Apache.Ignite.Core.Datastream
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
         public void Receive(ICache<TK, TV> cache, ICollection<ICacheEntry<TK, TV>> entries)
         {
-            var keys = new List<TK>(entries.Count);
-
+            // Don't use InvokeAll because semantics is different, e.g. duplicate keys are ignored.
             foreach (var entry in entries)
-                keys.Add(entry.Key);
-
-            cache.InvokeAll(keys, _proc, default(TArg));
+                cache.Invoke(entry.Key, _proc, default(TArg));
         }
 
         /** <inheritdoc /> */
@@ -76,4 +73,4 @@ namespace Apache.Ignite.Core.Datastream
             w.WriteObject(_proc);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 97b31be..56ce51f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -481,7 +481,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             DoOutOp(CacheOp.Put, key, val);
         }
@@ -492,7 +492,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.PutAsync, key, val);
         }
@@ -503,7 +503,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutInOpNullable(CacheOp.GetAndPut, key, val);
         }
@@ -514,7 +514,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.GetAndPutAsync, w =>
             {
@@ -529,7 +529,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutInOpNullable(CacheOp.GetAndReplace, key, val);
         }
@@ -540,7 +540,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.GetAndReplaceAsync, w =>
             {
@@ -554,7 +554,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutInOpNullable(CacheOp.GetAndRemove, key);
         }
@@ -564,7 +564,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.GetAndRemoveAsync, w => w.WriteObject(key), r => GetCacheResult(r));
         }
@@ -575,7 +575,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOp(CacheOp.PutIfAbsent, key, val);
         }
@@ -586,7 +586,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync<TK, TV, bool>(CacheOp.PutIfAbsentAsync, key, val);
         }
@@ -597,7 +597,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val);
         }
@@ -608,7 +608,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.GetAndPutIfAbsentAsync, w =>
             {
@@ -623,7 +623,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOp(CacheOp.Replace2, key, val);
         }
@@ -634,7 +634,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync<TK, TV, bool>(CacheOp.Replace2Async, key, val);
         }
@@ -646,7 +646,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(oldVal, "oldVal");
             IgniteArgumentCheck.NotNull(newVal, "newVal");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOp(CacheOp.Replace3, key, oldVal, newVal);
         }
@@ -658,7 +658,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(oldVal, "oldVal");
             IgniteArgumentCheck.NotNull(newVal, "newVal");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync<bool>(CacheOp.Replace3Async, w =>
             {
@@ -673,7 +673,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(vals, "vals");
 
-            StartTx();
+            StartTxIfNeeded();
 
             DoOutOp(CacheOp.PutAll, writer => writer.WriteDictionary(vals));
         }
@@ -683,7 +683,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(vals, "vals");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.PutAllAsync, writer => writer.WriteDictionary(vals));
         }
@@ -761,7 +761,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOp(CacheOp.RemoveObj, key);
         }
@@ -771,7 +771,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync<TK, bool>(CacheOp.RemoveObjAsync, key);
         }
@@ -782,7 +782,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOp(CacheOp.RemoveBool, key, val);
         }
@@ -793,7 +793,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(val, "val");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync<TK, TV, bool>(CacheOp.RemoveBoolAsync, key, val);
         }
@@ -803,7 +803,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
 
-            StartTx();
+            StartTxIfNeeded();
 
             DoOutOp(CacheOp.RemoveAll, writer => writer.WriteEnumerable(keys));
         }
@@ -813,7 +813,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(keys, "keys");
 
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => writer.WriteEnumerable(keys));
         }
@@ -821,7 +821,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public void RemoveAll()
         {
-            StartTx();
+            StartTxIfNeeded();
 
             DoOutInOp((int) CacheOp.RemoveAll2);
         }
@@ -829,7 +829,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task RemoveAllAsync()
         {
-            StartTx();
+            StartTxIfNeeded();
 
             return DoOutOpAsync(CacheOp.RemoveAll2Async);
         }
@@ -976,19 +976,30 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(processor, "processor");
 
-            StartTx();
+            StartTxIfNeeded();
 
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
 
-            return DoOutInOpX((int) CacheOp.Invoke,
-                writer =>
-                {
-                    writer.WriteObjectDetached(key);
-                    writer.WriteObjectDetached(holder);
-                },
-                (input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes),
-                _readException);
+            var ptr = AllocateIfNoTx(holder);
+
+            try
+            {
+                return DoOutInOpX((int) CacheOp.Invoke,
+                    writer =>
+                    {
+                        writer.WriteObjectDetached(key);
+                        writer.WriteLong(ptr);
+                        writer.WriteObjectDetached(holder);
+                    },
+                    (input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes),
+                    _readException);
+            }
+            finally
+            {
+                if (ptr != 0)
+                    _ignite.HandleRegistry.Release(ptr);
+            }
         }
 
         /** <inheritDoc /> */
@@ -997,28 +1008,44 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(key, "key");
             IgniteArgumentCheck.NotNull(processor, "processor");
 
-            StartTx();
+            StartTxIfNeeded();
 
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
 
-            return DoOutOpAsync(CacheOp.InvokeAsync, writer =>
-                {
-                    writer.WriteObjectDetached(key);
-                    writer.WriteObjectDetached(holder);
-                },
-                r =>
-                {
-                    if (r == null)
-                        return default(TRes);
+            var ptr = AllocateIfNoTx(holder);
+
+            try
+            {
+                return DoOutOpAsync(CacheOp.InvokeAsync, writer =>
+                    {
+                        writer.WriteObjectDetached(key);
+                        writer.WriteLong(ptr);
+                        writer.WriteObjectDetached(holder);
+                    },
+                    reader =>
+                    {
+                        if (ptr != 0)
+                            _ignite.HandleRegistry.Release(ptr);
 
-                    var hasError = r.ReadBoolean();
+                        if (reader == null)
+                            return default(TRes);
 
-                    if (hasError)
-                        throw ReadException(r);
+                        var hasError = reader.ReadBoolean();
 
-                    return r.ReadObject<TRes>();
-                });
+                        if (hasError)
+                            throw ReadException(reader);
+
+                        return reader.ReadObject<TRes>();
+                    });
+            }
+            catch (Exception)
+            {
+                if (ptr != 0)
+                    _ignite.HandleRegistry.Release(ptr);
+
+                throw;
+            }
         }
 
         /** <inheritdoc /> */
@@ -1028,20 +1055,31 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(keys, "keys");
             IgniteArgumentCheck.NotNull(processor, "processor");
 
-            StartTx();
+            StartTxIfNeeded();
 
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
 
-            return DoOutInOpX((int) CacheOp.InvokeAll,
-                writer =>
-                {
-                    writer.WriteEnumerable(keys);
-                    writer.Write(holder);
-                },
-                (input, res) => res == True
-                    ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary))
-                    : null, _readException);
+            var ptr = AllocateIfNoTx(holder);
+
+            try
+            {
+                return DoOutInOpX((int) CacheOp.InvokeAll,
+                    writer =>
+                    {
+                        writer.WriteEnumerable(keys);
+                        writer.WriteLong(ptr);
+                        writer.Write(holder);
+                    },
+                    (input, res) => res == True
+                        ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary))
+                        : null, _readException);
+            }
+            finally
+            {
+                if (ptr != 0)
+                    _ignite.HandleRegistry.Release(ptr);
+            }
         }
 
         /** <inheritDoc /> */
@@ -1051,19 +1089,37 @@ namespace Apache.Ignite.Core.Impl.Cache
             IgniteArgumentCheck.NotNull(keys, "keys");
             IgniteArgumentCheck.NotNull(processor, "processor");
 
-            StartTx();
+            StartTxIfNeeded();
 
             var holder = new CacheEntryProcessorHolder(processor, arg,
                 (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
 
-            return DoOutOpAsync(CacheOp.InvokeAllAsync,
-                writer =>
-                {
-                    writer.WriteEnumerable(keys);
-                    writer.Write(holder);
-                },
-                input => ReadInvokeAllResults<TRes>(input));
+            var ptr = AllocateIfNoTx(holder);
+
+            try
+            {
+                return DoOutOpAsync(CacheOp.InvokeAllAsync,
+                    writer =>
+                    {
+                        writer.WriteEnumerable(keys);
+                        writer.WriteLong(ptr);
+                        writer.Write(holder);
+                    },
+                    reader =>
+                    {
+                        if (ptr != 0)
+                            _ignite.HandleRegistry.Release(ptr);
+
+                        return ReadInvokeAllResults<TRes>(reader);
+                    });
+            }
+            catch (Exception)
+            {
+                if (ptr != 0)
+                    _ignite.HandleRegistry.Release(ptr);
 
+                throw;
+            }
         }
 
         /** <inheritDoc /> */
@@ -1544,15 +1600,6 @@ namespace Apache.Ignite.Core.Impl.Cache
             DoOutInOp((int) CacheOp.CloseLock, id);
         }
 
-        /// <summary>
-        /// Starts a transaction when applicable.
-        /// </summary>
-        private void StartTx()
-        {
-            if (_txManager != null)
-                _txManager.StartTx();
-        }
-
         /** <inheritdoc /> */
         public IQueryMetrics GetQueryMetrics()
         {
@@ -1587,5 +1634,27 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             return DoOutOp(CacheOp.LocalPreloadPartition, w => w.WriteInt(partition));
         }
+
+        /// <summary>
+        /// Starts a transaction when applicable.
+        /// </summary>
+        private void StartTxIfNeeded()
+        {
+            if (_txManager != null)
+                _txManager.StartTx();
+        }
+
+        /// <summary>
+        /// Allocates a handle only when there is no active transaction.
+        /// </summary>
+        /// <returns>Handle or 0 when there is an active transaction.</returns>
+        private long AllocateIfNoTx(object obj)
+        {
+            // With transactions, actual cache operation execution is delayed, so we don't control handle lifetime.
+            if (_txManager != null && _txManager.IsInTx())
+                return 0;
+
+            return _ignite.HandleRegistry.Allocate(obj);
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
index a2ef63c..9838689 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Transactions/CacheTransactionManager.cs
@@ -82,6 +82,11 @@ namespace Apache.Ignite.Core.Impl.Transactions
             }
         }
 
+        public bool IsInTx()
+        {
+            return _transactions.Tx != null;
+        }
+
         /** <inheritdoc /> */
         [SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods")]
         void IEnlistmentNotification.Prepare(PreparingEnlistment preparingEnlistment)
@@ -171,4 +176,4 @@ namespace Apache.Ignite.Core.Impl.Transactions
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index 855d11d..a597fa0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -1339,4 +1339,4 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
index 3f4e493..a45f1d2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
+++ b/modules/platforms/dotnet/Apache.Ignite.sln.DotSettings
@@ -9,6 +9,7 @@
 	<s:String x:Key="/Default/CodeInspection/Highlighting/InspectionSeverities/=ConvertClosureToMethodGroup/@EntryIndexedValue">DO_NOT_SHOW</s:String>
 	<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002EXml_002ECodeStyle_002EFormatSettingsUpgrade_002EXmlMoveToCommonFormatterSettingsUpgrade/@EntryIndexedValue">True</s:Boolean>
 	<s:Boolean x:Key="/Default/Environment/UnitTesting/ShadowCopy/@EntryValue">False</s:Boolean>
+	<s:Boolean x:Key="/Default/UserDictionary/Words/=Dataload/@EntryIndexedValue">True</s:Boolean>
 	<s:Boolean x:Key="/Default/UserDictionary/Words/=Failover/@EntryIndexedValue">True</s:Boolean>
 	<s:Boolean x:Key="/Default/UserDictionary/Words/=preloads/@EntryIndexedValue">True</s:Boolean>
 </wpf:ResourceDictionary>
\ No newline at end of file