You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/01 05:55:22 UTC

[12/21] ignite git commit: IGNITE-7764: MVCC: cache API support. This closes #4725.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 4d5fa13..a83a93f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -42,8 +42,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
@@ -61,6 +59,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -68,6 +70,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.C2;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -186,13 +189,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (keyCheck)
             validateCacheKey(key);
 
-        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
+        GridNearTxLocal tx = ctx.mvccEnabled() ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx);
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         final boolean recovery = opCtx != null && opCtx.recovery();
 
-        if (tx != null && !tx.implicit() && !skipTx) {
+        // Get operation bypass Tx in Mvcc mode.
+        if (!ctx.mvccEnabled() && tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<V>() {
                 @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
                     IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
@@ -230,6 +234,26 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
         subjId = ctx.subjectIdPerCall(subjId, opCtx);
 
+        MvccSnapshot mvccSnapshot = null;
+        MvccQueryTracker mvccTracker = null;
+
+        if (ctx.mvccEnabled()) {
+            try {
+                if (tx != null)
+                    mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+                else {
+                    mvccTracker = MvccUtils.mvccTracker(ctx, null);
+
+                    mvccSnapshot = mvccTracker.snapshot();
+                }
+
+                assert mvccSnapshot != null;
+            }
+            catch (IgniteCheckedException ex) {
+                return new GridFinishedFuture<>(ex);
+            }
+        }
+
         GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
             ctx.toCacheKeyObject(key),
             topVer,
@@ -243,10 +267,21 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             needVer,
             /*keepCacheObjects*/false,
             opCtx != null && opCtx.recovery(),
-            null);
+            mvccSnapshot);
 
         fut.init();
 
+        if(mvccTracker != null){
+            final MvccQueryTracker mvccTracker0 = mvccTracker;
+
+            fut.listen(new CI1<IgniteInternalFuture<Object>>() {
+                @Override public void apply(IgniteInternalFuture<Object> future) {
+                    if(future.isDone())
+                        mvccTracker0.onDone();
+                }
+            });
+        }
+
         return (IgniteInternalFuture<V>)fut;
     }
 
@@ -270,13 +305,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (keyCheck)
             validateCacheKeys(keys);
 
-        GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
+        GridNearTxLocal tx = (ctx.mvccEnabled()) ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx);
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-        if (tx != null && !tx.implicit() && !skipTx) {
+        if (!ctx.mvccEnabled() && tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
-                @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
+                /** {@inheritDoc} */
+                @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx,
+                    AffinityTopologyVersion readyTopVer) {
                     return tx.getAllAsync(ctx,
                         readyTopVer,
                         ctx.cacheKeysView(keys),
@@ -290,14 +327,34 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             }, opCtx, /*retry*/false);
         }
 
-        AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
-
         subjId = ctx.subjectIdPerCall(subjId, opCtx);
 
-        return loadAsync(
+        MvccSnapshot mvccSnapshot = null;
+        MvccQueryTracker mvccTracker = null;
+
+        if (ctx.mvccEnabled()) {
+            try {
+                if (tx != null)
+                    mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx);
+                else {
+                    mvccTracker = MvccUtils.mvccTracker(ctx, null);
+
+                    mvccSnapshot = mvccTracker.snapshot();
+                }
+
+                assert mvccSnapshot != null;
+            }
+            catch (IgniteCheckedException ex) {
+                return new GridFinishedFuture(ex);
+            }
+        }
+
+        AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+
+        IgniteInternalFuture<Map<K, V>> fut = loadAsync(
             ctx.cacheKeysView(keys),
             opCtx == null || !opCtx.skipStore(),
-            forcePrimary,
+            forcePrimary ,
             topVer,
             subjId,
             taskName,
@@ -305,46 +362,23 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             recovery,
             skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
             skipVals,
-            needVer);
-    }
-
-    /**
-     * @param keys Keys to load.
-     * @param readThrough Read through flag.
-     * @param forcePrimary Force get from primary node flag.
-     * @param topVer Topology version.
-     * @param subjId Subject ID.
-     * @param taskName Task name.
-     * @param deserializeBinary Deserialize binary flag.
-     * @param expiryPlc Expiry policy.
-     * @param skipVals Skip values flag.
-     * @param needVer Need version.
-     * @return Loaded values.
-     */
-    private IgniteInternalFuture<Map<K, V>> loadAsync(
-        @Nullable Collection<KeyCacheObject> keys,
-        boolean readThrough,
-        boolean forcePrimary,
-        AffinityTopologyVersion topVer,
-        @Nullable UUID subjId,
-        String taskName,
-        boolean deserializeBinary,
-        boolean recovery,
-        @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals,
-        boolean needVer) {
-        return loadAsync(keys,
-            readThrough,
-            forcePrimary,
-            topVer, subjId,
-            taskName,
-            deserializeBinary,
-            recovery,
-            expiryPlc,
-            skipVals,
             needVer,
             false,
-            null);
+            mvccSnapshot);
+
+        if(mvccTracker != null){
+            final MvccQueryTracker mvccTracker0 = mvccTracker;
+
+            fut.listen(new CI1<IgniteInternalFuture<Map<K, V>>>() {
+                /** {@inheritDoc} */
+                @Override public void apply(IgniteInternalFuture<Map<K, V>> future) {
+                    if(future.isDone())
+                        mvccTracker0.onDone();
+                }
+            });
+        }
+
+        return fut;
     }
 
     /**
@@ -445,7 +479,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                 for (KeyCacheObject key : keys) {
                     if (readNoEntry) {
-                        CacheDataRow row = ctx.offheap().read(ctx, key);
+                        CacheDataRow row = mvccSnapshot != null ?
+                            ctx.offheap().mvccRead(ctx, key, mvccSnapshot) :
+                            ctx.offheap().read(ctx, key);
 
                         if (row != null) {
                             long expireTime = row.expireTime();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index b167f26..85a48a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -287,10 +287,15 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         boolean queryMapped = false;
 
-        for (GridDistributedTxMapping m : F.view(tx.mappings().mappings(), CU.FILTER_QUERY_MAPPING)) {
+        assert !tx.implicitSingle() || tx.queryEnlisted(); // Non-mvcc implicit-single tx goes fast commit way.
+
+        Collection<GridDistributedTxMapping> txMappings = !tx.implicitSingle() ? tx.mappings().mappings()
+            : Collections.singleton(tx.mappings().singleMapping());
+
+        for (GridDistributedTxMapping m : F.view(txMappings, CU.FILTER_QUERY_MAPPING)) {
             GridDistributedTxMapping nodeMapping = mappings.get(m.primary().id());
 
-            if(nodeMapping == null)
+            if (nodeMapping == null)
                 mappings.put(m.primary().id(), m);
 
             txMapping.addMapping(F.asList(m.primary()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index f484bd6..11f98ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -42,10 +42,10 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -53,11 +53,8 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundIdentityFuture<Long> implements
-    GridCacheVersionedFuture<Long> {
-    /** */
-    private static final long serialVersionUID = -6069985059301497282L;
-
+public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoundIdentityFuture<T> implements
+    GridCacheVersionedFuture<T> {
     /** Done field updater. */
     private static final AtomicIntegerFieldUpdater<GridNearTxAbstractEnlistFuture> DONE_UPD =
         AtomicIntegerFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, "done");
@@ -117,10 +114,11 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
      * @param cctx Cache context.
      * @param tx Transaction.
      * @param timeout Timeout.
+     * @param rdc Compound future reducer.
      */
     public GridNearTxAbstractEnlistFuture(
-        GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout) {
-        super(CU.longReducer());
+        GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout, @Nullable IgniteReducer<T, T> rdc) {
+        super(rdc);
 
         assert cctx != null;
         assert tx != null;
@@ -300,8 +298,6 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
             throw new IgniteCheckedException("Future is done.");
     }
 
-
-
     /**
      */
     private void mapOnTopology() {
@@ -359,7 +355,7 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<Long> fut) {
+    @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<T> fut) {
         if (ex != null || !EX_UPD.compareAndSet(this, null, err))
             ex.addSuppressed(err);
 
@@ -367,7 +363,7 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err, boolean cancelled) {
+    @Override public boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancelled) {
         if (!DONE_UPD.compareAndSet(this, 0, 1))
             return false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
new file mode 100644
index 0000000..8d85bd9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxEnlistFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxResultHandler.createResponse;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * A future tracking requests for remote nodes transaction enlisting and locking produces by cache API operations.
+ */
+public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridCacheReturn> {
+    /** Default batch size. */
+    public static final int DFLT_BATCH_SIZE = 1024;
+
+    /** SkipCntr field updater. */
+    private static final AtomicIntegerFieldUpdater<GridNearTxEnlistFuture> SKIP_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(GridNearTxEnlistFuture.class, "skipCntr");
+
+    /** Marker object. */
+    private static final Object FINISHED = new Object();
+
+    /** Source iterator. */
+    @GridToStringExclude
+    private final UpdateSourceIterator<?> it;
+
+    /** Batch size. */
+    private int batchSize;
+
+    /** */
+    private AtomicInteger batchCntr = new AtomicInteger();
+
+    /** */
+    @SuppressWarnings("unused")
+    @GridToStringExclude
+    private volatile int skipCntr;
+
+    /** Future result. */
+    @GridToStringExclude
+    private volatile GridCacheReturn res;
+
+    /** */
+    private final Map<UUID, Batch> batches = new ConcurrentHashMap<>();
+
+    /** Row extracted from iterator but not yet used. */
+    private Object peek;
+
+    /** Topology locked flag. */
+    private boolean topLocked;
+
+    /** Ordered batch sending flag. */
+    private final boolean sequential;
+
+    /** Filter. */
+    private final CacheEntryPredicate filter;
+
+    /** Need previous value flag. */
+    private final boolean needRes;
+
+    /**
+     * @param cctx Cache context.
+     * @param tx Transaction.
+     * @param timeout Timeout.
+     * @param it Rows iterator.
+     * @param batchSize Batch size.
+     * @param sequential Sequential locking flag.
+     * @param filter Filter.
+     * @param needRes Need previous value flag.
+     */
+    public GridNearTxEnlistFuture(GridCacheContext<?, ?> cctx,
+        GridNearTxLocal tx,
+        long timeout,
+        UpdateSourceIterator<?> it,
+        int batchSize,
+        boolean sequential,
+        @Nullable CacheEntryPredicate filter,
+        boolean needRes) {
+        super(cctx, tx, timeout, null);
+
+        this.it = it;
+        this.batchSize = batchSize > 0 ? batchSize : DFLT_BATCH_SIZE;
+        this.sequential = sequential;
+        this.filter = filter;
+        this.needRes = needRes;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void map(boolean topLocked) {
+        this.topLocked = topLocked;
+
+        sendNextBatches(null);
+    }
+
+    /**
+     * Continue iterating the data rows and form new batches.
+     *
+     * @param nodeId Node that is ready for a new batch.
+     */
+    private void sendNextBatches(@Nullable UUID nodeId) {
+        try {
+            Collection<Batch> next = continueLoop(nodeId);
+
+            if (next == null)
+                return;
+
+            boolean first = (nodeId != null);
+
+            for (Batch batch : next) {
+                ClusterNode node = batch.node();
+
+                sendBatch(node, batch, first);
+
+                if (!node.isLocal())
+                    first = false;
+            }
+        }
+        catch (Throwable e) {
+            onDone(e);
+
+            if (e instanceof Error)
+                throw (Error)e;
+        }
+    }
+
+    /**
+     * Iterate data rows and form batches.
+     *
+     * @param nodeId Id of node acknowledged the last batch.
+     * @return Collection of newly completed batches.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Collection<Batch> continueLoop(@Nullable UUID nodeId) throws IgniteCheckedException {
+        if (nodeId != null)
+            batches.remove(nodeId);
+
+        // Accumulate number of batches released since we got here.
+        // Let only one thread do the looping.
+        if (isDone() || SKIP_UPD.getAndIncrement(this) != 0)
+            return null;
+
+        ArrayList<Batch> res = null;
+        Batch batch = null;
+
+        boolean flush = false;
+
+        EnlistOperation op = it.operation();
+
+        while (true) {
+            while (hasNext0()) {
+                checkCompleted();
+
+                Object cur = next0();
+
+                KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
+
+                List<ClusterNode> nodes = cctx.affinity().nodesByKey(key, topVer);
+
+                ClusterNode node;
+
+                if (F.isEmpty(nodes) || ((node = nodes.get(0)) == null))
+                    throw new ClusterTopologyCheckedException("Failed to get primary node " +
+                        "[topVer=" + topVer + ", key=" + key + ']');
+
+                tx.markQueryEnlisted(null);
+
+                if (!sequential)
+                    batch = batches.get(node.id());
+                else if (batch != null && !batch.node().equals(node))
+                    res = markReady(res, batch);
+
+                if (batch == null)
+                    batches.put(node.id(), batch = new Batch(node));
+
+                if (batch.ready()) {
+                    // Can't advance further at the moment.
+                    batch = null;
+
+                    peek = cur;
+
+                    it.beforeDetach();
+
+                    flush = true;
+
+                    break;
+                }
+
+                batch.add(op.isDeleteOrLock() ? key : cur,
+                    op != EnlistOperation.LOCK && cctx.affinityNode() && (cctx.isReplicated() || nodes.indexOf(cctx.localNode()) > 0));
+
+                if (batch.size() == batchSize)
+                    res = markReady(res, batch);
+            }
+
+            if (SKIP_UPD.decrementAndGet(this) == 0)
+                break;
+
+            skipCntr = 1;
+        }
+
+        if (flush)
+            return res;
+
+        // No data left - flush incomplete batches.
+        for (Batch batch0 : batches.values()) {
+            if (!batch0.ready()) {
+                if (res == null)
+                    res = new ArrayList<>();
+
+                batch0.ready(true);
+
+                res.add(batch0);
+            }
+        }
+
+        if (batches.isEmpty())
+            onDone(this.res);
+
+        return res;
+    }
+
+    /** */
+    private Object next0() {
+        if (!hasNext0())
+            throw new NoSuchElementException();
+
+        Object cur;
+
+        if ((cur = peek) != null)
+            peek = null;
+        else
+            cur = it.next();
+
+        return cur;
+    }
+
+    /** */
+    private boolean hasNext0() {
+        if (peek == null && !it.hasNext())
+            peek = FINISHED;
+
+        return peek != FINISHED;
+    }
+
+    /**
+     * Add batch to batch collection if it is ready.
+     *
+     * @param batches Collection of batches.
+     * @param batch Batch to be added.
+     */
+    private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) {
+        if (!batch.ready()) {
+            batch.ready(true);
+
+            if (batches == null)
+                batches = new ArrayList<>();
+
+            batches.add(batch);
+        }
+
+        return batches;
+    }
+
+    /**
+     * @param primaryId Primary node id.
+     * @param rows Rows.
+     * @param dhtVer Dht version assigned at primary node.
+     * @param dhtFutId Dht future id assigned at primary node.
+     */
+    private void processBatchLocalBackupKeys(UUID primaryId, List<Object> rows, GridCacheVersion dhtVer,
+        IgniteUuid dhtFutId) {
+        assert dhtVer != null;
+        assert dhtFutId != null;
+
+        EnlistOperation op = it.operation();
+
+        assert op != EnlistOperation.LOCK;
+
+        boolean keysOnly = op.isDeleteOrLock();
+
+        final ArrayList<KeyCacheObject> keys = new ArrayList<>(rows.size());
+        final ArrayList<Message> vals = keysOnly ? null : new ArrayList<>(rows.size());
+
+        for (Object row : rows) {
+            if (keysOnly)
+                keys.add(cctx.toCacheKeyObject(row));
+            else {
+                keys.add(cctx.toCacheKeyObject(((IgniteBiTuple)row).getKey()));
+                vals.add(cctx.toCacheObject(((IgniteBiTuple)row).getValue()));
+            }
+        }
+
+        try {
+            GridDhtTxRemote dhtTx = cctx.tm().tx(dhtVer);
+
+            if (dhtTx == null) {
+                dhtTx = new GridDhtTxRemote(cctx.shared(),
+                    cctx.localNodeId(),
+                    dhtFutId,
+                    primaryId,
+                    lockVer,
+                    topVer,
+                    dhtVer,
+                    null,
+                    cctx.systemTx(),
+                    cctx.ioPolicy(),
+                    PESSIMISTIC,
+                    REPEATABLE_READ,
+                    false,
+                    tx.remainingTime(),
+                    -1,
+                    this.tx.subjectId(),
+                    this.tx.taskNameHash(),
+                    false);
+
+                dhtTx.mvccSnapshot(new MvccSnapshotWithoutTxs(mvccSnapshot.coordinatorVersion(),
+                    mvccSnapshot.counter(), MVCC_OP_COUNTER_NA, mvccSnapshot.cleanupVersion()));
+
+                dhtTx = cctx.tm().onCreated(null, dhtTx);
+
+                if (dhtTx == null || !cctx.tm().onStarted(dhtTx)) {
+                    throw new IgniteTxRollbackCheckedException("Failed to update backup " +
+                        "(transaction has been completed): " + dhtVer);
+                }
+            }
+
+            dhtTx.mvccEnlistBatch(cctx, it.operation(), keys, vals, mvccSnapshot.withoutActiveTransactions());
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
+
+            return;
+        }
+
+        sendNextBatches(primaryId);
+    }
+
+    /**
+     *
+     * @param node Node.
+     * @param batch Batch.
+     * @param first First mapping flag.
+     */
+    private void sendBatch(ClusterNode node, Batch batch, boolean first) throws IgniteCheckedException {
+        updateMappings(node);
+
+        boolean clientFirst = first && cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks();
+
+        int batchId = batchCntr.incrementAndGet();
+
+        if (node.isLocal())
+            enlistLocal(batchId, node.id(), batch);
+        else
+            sendBatch(batchId, node.id(), batch, clientFirst);
+    }
+
+    /**
+     * Send batch request to remote data node.
+     *
+     * @param batchId Id of a batch mini-future.
+     * @param nodeId Node id.
+     * @param batchFut Mini-future for the batch.
+     * @param clientFirst {@code true} if originating node is client and it is a first request to any data node.
+     */
+    private void sendBatch(int batchId, UUID nodeId, Batch batchFut, boolean clientFirst) throws IgniteCheckedException {
+        assert batchFut != null;
+
+        GridNearTxEnlistRequest req = new GridNearTxEnlistRequest(cctx.cacheId(),
+            threadId,
+            futId,
+            batchId,
+            tx.subjectId(),
+            topVer,
+            lockVer,
+            mvccSnapshot,
+            clientFirst,
+            remainingTime(),
+            tx.remainingTime(),
+            tx.taskNameHash(),
+            batchFut.rows(),
+            it.operation(),
+            needRes,
+            filter
+        );
+
+        sendRequest(req, nodeId);
+    }
+
+    /**
+     * @param req Request.
+     * @param nodeId Remote node ID
+     * @throws IgniteCheckedException if failed to send.
+     */
+    private void sendRequest(GridCacheMessage req, UUID nodeId) throws IgniteCheckedException {
+        IgniteInternalFuture<?> txSync = cctx.tm().awaitFinishAckAsync(nodeId, tx.threadId());
+
+        if (txSync == null || txSync.isDone())
+            cctx.io().send(nodeId, req, cctx.ioPolicy());
+        else
+            txSync.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> future) {
+                    try {
+                        cctx.io().send(nodeId, req, cctx.ioPolicy());
+                    }
+                    catch (IgniteCheckedException e) {
+                        GridNearTxEnlistFuture.this.onDone(e);
+                    }
+                }
+            });
+    }
+
+    /**
+     * Enlist batch of entries to the transaction on local node.
+     *
+     * @param batchId Id of a batch mini-future.
+     * @param nodeId Node id.
+     * @param batch Batch.
+     */
+    private void enlistLocal(int batchId, UUID nodeId, Batch batch) throws IgniteCheckedException {
+        Collection<Object> rows = batch.rows();
+
+        GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(nodeId,
+            lockVer,
+            mvccSnapshot,
+            threadId,
+            futId,
+            batchId,
+            tx,
+            remainingTime(),
+            cctx,
+            rows,
+            it.operation(),
+            filter,
+            needRes);
+
+        updateLocalFuture(fut);
+
+        fut.listen(new CI1<IgniteInternalFuture<GridCacheReturn>>() {
+            @Override public void apply(IgniteInternalFuture<GridCacheReturn> fut) {
+                try {
+                    clearLocalFuture((GridDhtTxAbstractEnlistFuture)fut);
+
+                    GridNearTxEnlistResponse res = fut.error() == null ? createResponse(fut) : null;
+
+                    if (checkResponse(nodeId, res, fut.error()))
+                        sendNextBatches(nodeId);
+                }
+                catch (IgniteCheckedException e) {
+                    checkResponse(nodeId, null, e);
+                }
+                finally {
+                    CU.unwindEvicts(cctx);
+                }
+            }
+        });
+
+        fut.init();
+    }
+
+    /**
+     * @param nodeId Sender node id.
+     * @param res Response.
+     */
+    public void onResult(UUID nodeId, GridNearTxEnlistResponse res) {
+        if (checkResponse(nodeId, res, res.error())) {
+
+            Batch batch = batches.get(nodeId);
+
+            if (batch != null && !F.isEmpty(batch.localBackupRows()) && res.dhtFutureId() != null)
+                processBatchLocalBackupKeys(nodeId, batch.localBackupRows(), res.dhtVersion(), res.dhtFutureId());
+            else
+                sendNextBatches(nodeId);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        if (batches.keySet().contains(nodeId)) {
+            if (log.isDebugEnabled())
+                log.debug("Found unacknowledged batch for left node [nodeId=" + nodeId + ", fut=" +
+                    this + ']');
+
+            ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " +
+                "(primary node left grid, retry transaction if possible) [node=" + nodeId + ']');
+
+            topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
+
+            processFailure(topEx, null);
+
+            batches.remove(nodeId);
+
+            if (batches.isEmpty()) // Wait for all pending requests.
+                onDone();
+
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
+                ", fut=" + this + ']');
+
+        return false;
+    }
+
+    /**
+     * @param nodeId Originating node ID.
+     * @param res Response.
+     * @param err Exception.
+     * @return {@code True} if future was completed by this call.
+     */
+    @SuppressWarnings("unchecked")
+    public boolean checkResponse(UUID nodeId, GridNearTxEnlistResponse res, Throwable err) {
+        assert res != null || err != null : this;
+
+        if (err == null && res.error() != null)
+            err = res.error();
+
+        if (X.hasCause(err, ClusterTopologyCheckedException.class))
+            tx.removeMapping(nodeId);
+
+        if (err != null)
+            processFailure(err, null);
+
+        if (ex != null) {
+            batches.remove(nodeId);
+
+            if (batches.isEmpty()) // Wait for all pending requests.
+                onDone();
+
+            return false;
+        }
+
+        assert res != null;
+
+        this.res = res.result();
+
+        assert this.res != null && (this.res.emptyResult() || needRes || !this.res.success());
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearTxEnlistFuture.class, this, super.toString());
+    }
+
+    /**
+     * A batch of rows
+     */
+    private static class Batch {
+        /** Node ID. */
+        @GridToStringExclude
+        private final ClusterNode node;
+
+        /** Rows. */
+        private List<Object> rows = new ArrayList<>();
+
+        /** Local backup rows. */
+        private List<Object> locBkpRows;
+
+        /** Readiness flag. Set when batch is full or no new rows are expected. */
+        private boolean ready;
+
+        /**
+         * @param node Cluster node.
+         */
+        private Batch(ClusterNode node) {
+            this.node = node;
+        }
+
+        /**
+         * @return Node.
+         */
+        public ClusterNode node() {
+            return node;
+        }
+
+        /**
+         * Adds a row.
+         *
+         * @param row Row.
+         * @param localBackup {@code true}, when the row key has local backup.
+         */
+        public void add(Object row, boolean localBackup) {
+            rows.add(row);
+
+            if (localBackup) {
+                if (locBkpRows == null)
+                    locBkpRows = new ArrayList<>();
+
+                locBkpRows.add(row);
+            }
+        }
+
+        /**
+         * @return number of rows.
+         */
+        public int size() {
+            return rows.size();
+        }
+
+        /**
+         * @return Collection of rows.
+         */
+        public Collection<Object> rows() {
+            return rows;
+        }
+
+        /**
+         * @return Collection of local backup rows.
+         */
+        public List<Object> localBackupRows() {
+            return locBkpRows;
+        }
+
+        /**
+         * @return Readiness flag.
+         */
+        public boolean ready() {
+            return ready;
+        }
+
+        /**
+         * Sets readiness flag.
+         *
+         * @param ready Flag value.
+         */
+        public void ready(boolean ready) {
+            this.ready = ready;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
new file mode 100644
index 0000000..1d87023
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Request to enlist into transaction and acquire locks for entries produced with Cache API operations.
+ *
+ * One request per batch of entries is used.
+ */
+public class GridNearTxEnlistRequest extends GridCacheIdMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long threadId;
+
+    /** Future id. */
+    private IgniteUuid futId;
+
+    /** */
+    private boolean clientFirst;
+
+    /** */
+    private int miniId;
+
+    /** */
+    private UUID subjId;
+
+    /** */
+    private AffinityTopologyVersion topVer;
+
+    /** */
+    private GridCacheVersion lockVer;
+
+    /** Mvcc snapshot. */
+    private MvccSnapshot mvccSnapshot;
+
+    /** */
+    private long timeout;
+
+    /** */
+    private long txTimeout;
+
+    /** */
+    private int taskNameHash;
+
+    /** Rows to enlist. */
+    @GridDirectTransient
+    private Collection<Object> rows;
+
+    /** Serialized rows keys. */
+    @GridToStringExclude
+    private KeyCacheObject[] keys;
+
+    /** Serialized rows values. */
+    @GridToStringExclude
+    private CacheObject[] values;
+
+    /** Enlist operation. */
+    private EnlistOperation op;
+
+    /** Filter. */
+    @GridToStringExclude
+    private CacheEntryPredicate filter;
+
+    /** Need previous value flag. */
+    private boolean needRes;
+
+    /**
+     * Default constructor.
+     */
+    public GridNearTxEnlistRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache id.
+     * @param threadId Thread id.
+     * @param futId Future id.
+     * @param miniId Mini-future id.
+     * @param subjId Transaction subject id.
+     * @param topVer Topology version.
+     * @param lockVer Lock version.
+     * @param mvccSnapshot Mvcc snapshot.
+     * @param clientFirst First client request flag.
+     * @param timeout Timeout.
+     * @param txTimeout Tx timeout.
+     * @param taskNameHash Task name hash.
+     * @param rows Rows.
+     * @param op Operation.
+     * @param filter Filter.
+     */
+    GridNearTxEnlistRequest(int cacheId,
+        long threadId,
+        IgniteUuid futId,
+        int miniId,
+        UUID subjId,
+        AffinityTopologyVersion topVer,
+        GridCacheVersion lockVer,
+        MvccSnapshot mvccSnapshot,
+        boolean clientFirst,
+        long timeout,
+        long txTimeout,
+        int taskNameHash,
+        Collection<Object> rows,
+        EnlistOperation op,
+        boolean needRes,
+        @Nullable CacheEntryPredicate filter) {
+        this.txTimeout = txTimeout;
+        this.filter = filter;
+        this.cacheId = cacheId;
+        this.threadId = threadId;
+        this.futId = futId;
+        this.miniId = miniId;
+        this.subjId = subjId;
+        this.topVer = topVer;
+        this.lockVer = lockVer;
+        this.mvccSnapshot = mvccSnapshot;
+        this.clientFirst = clientFirst;
+        this.timeout = timeout;
+        this.taskNameHash = taskNameHash;
+        this.rows = rows;
+        this.op = op;
+        this.needRes = needRes;
+    }
+
+    /**
+     * @return Thread id.
+     */
+    public long threadId() {
+        return threadId;
+    }
+
+    /**
+     * @return Future id.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Mini future ID.
+     */
+    public int miniId() {
+        return miniId;
+    }
+
+    /**
+     * @return Subject id.
+     */
+    public UUID subjectId() {
+        return subjId;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Lock version.
+     */
+    public GridCacheVersion version() {
+        return lockVer;
+    }
+
+    /**
+     * @return MVCC snapshot.
+     */
+    public MvccSnapshot mvccSnapshot() {
+        return mvccSnapshot;
+    }
+
+    /**
+     * @return Timeout milliseconds.
+     */
+    public long timeout() {
+        return timeout;
+    }
+
+    /**
+     * @return Tx timeout milliseconds.
+     */
+    public long txTimeout() {
+        return txTimeout;
+    }
+
+    /**
+     * @return Task name hash.
+     */
+    public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /**
+     * @return {@code True} if this is the first client request.
+     */
+    public boolean firstClientRequest() {
+        return clientFirst;
+    }
+
+    /**
+     * @return Collection of rows.
+     */
+    public Collection<Object> rows() {
+        return rows;
+    }
+
+    /**
+     * @return Operation.
+     */
+    public EnlistOperation operation() {
+        return op;
+    }
+
+    /**
+     * @return Need result flag.
+     */
+    public boolean needRes() {
+        return needRes;
+    }
+
+    /**
+     * @return Filter.
+     */
+    public CacheEntryPredicate filter() {
+        return filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+        CacheObjectContext objCtx = cctx.cacheObjectContext();
+
+        if (rows != null && keys == null) {
+            keys = new KeyCacheObject[rows.size()];
+
+            int i = 0;
+
+            boolean keysOnly = op.isDeleteOrLock();
+
+            values = keysOnly ? null : new CacheObject[keys.length];
+
+            for (Object row : rows) {
+                Object key, val = null;
+
+                if (keysOnly)
+                    key = row;
+                else {
+                    key = ((IgniteBiTuple)row).getKey();
+                    val = ((IgniteBiTuple)row).getValue();
+                }
+
+                assert key != null && (keysOnly || val != null) : "key=" + key + ", val=" + val;
+
+                KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+                assert key0 != null;
+
+                key0.prepareMarshal(objCtx);
+
+                keys[i] = key0;
+
+                if (!keysOnly) {
+                    CacheObject val0 = cctx.toCacheObject(val);
+
+                    assert val0 != null;
+
+                    val0.prepareMarshal(objCtx);
+
+                    values[i] = val0;
+                }
+
+                i++;
+            }
+        }
+
+        if (filter != null)
+            filter.prepareMarshal(cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        if (keys != null) {
+            rows = new ArrayList<>(keys.length);
+
+            CacheObjectContext objCtx = ctx.cacheContext(cacheId).cacheObjectContext();
+
+            for (int i = 0; i < keys.length; i++) {
+                keys[i].finishUnmarshal(objCtx, ldr);
+
+                if (op.isDeleteOrLock())
+                    rows.add(keys[i]);
+                else {
+                    if (values[i] != null)
+                        values[i].finishUnmarshal(objCtx, ldr);
+
+                    rows.add(new IgniteBiTuple<>(keys[i], values[i]));
+                }
+            }
+
+            keys = null;
+            values = null;
+        }
+
+        if (filter != null)
+            filter.finishUnmarshal(ctx.cacheContext(cacheId), ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeBoolean("clientFirst", clientFirst))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("filter", filter))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeObjectArray("keys", keys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("lockVer", lockVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("miniId", miniId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeBoolean("needRes", needRes))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeLong("threadId", threadId))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
+                if (!writer.writeLong("timeout", timeout))
+                    return false;
+
+                writer.incrementState();
+
+            case 16:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 17:
+                if (!writer.writeLong("txTimeout", txTimeout))
+                    return false;
+
+                writer.incrementState();
+
+            case 18:
+                if (!writer.writeObjectArray("values", values, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                clientFirst = reader.readBoolean("clientFirst");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                filter = reader.readMessage("filter");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                keys = reader.readObjectArray("keys", MessageCollectionItemType.MSG, KeyCacheObject.class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                lockVer = reader.readMessage("lockVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                miniId = reader.readInt("miniId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                mvccSnapshot = reader.readMessage("mvccSnapshot");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                needRes = reader.readBoolean("needRes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                byte opOrd;
+
+                opOrd = reader.readByte("op");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                op = EnlistOperation.fromOrdinal(opOrd);
+
+                reader.incrementState();
+
+            case 12:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                threadId = reader.readLong("threadId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
+                timeout = reader.readLong("timeout");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 16:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 17:
+                txTimeout = reader.readLong("txTimeout");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
+                values = reader.readObjectArray("values", MessageCollectionItemType.MSG, CacheObject.class);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearTxEnlistRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 19;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 159;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearTxEnlistRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
new file mode 100644
index 0000000..4f4bbb6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.ExceptionAware;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A response to {@link GridNearTxEnlistRequest}.
+ */
+public class GridNearTxEnlistResponse extends GridCacheIdMessage implements ExceptionAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Error. */
+    @GridDirectTransient
+    private Throwable err;
+
+    /** Serialized error. */
+    private byte[] errBytes;
+
+    /** Mini future id. */
+    private int miniId;
+
+    /** Result. */
+    private GridCacheReturn res;
+
+    /** */
+    private GridCacheVersion lockVer;
+
+    /** */
+    private GridCacheVersion dhtVer;
+
+    /** */
+    private IgniteUuid dhtFutId;
+
+    /** New DHT nodes involved into transaction. */
+    @GridDirectCollection(UUID.class)
+    private Collection<UUID> newDhtNodes;
+
+    /**
+     * Default constructor.
+     */
+    public GridNearTxEnlistResponse() {
+        // No-op.
+    }
+
+    /**
+     * Constructor for normal result.
+     *
+     * @param cacheId Cache id.
+     * @param futId Future id.
+     * @param miniId Mini future id.
+     * @param lockVer Lock version.
+     * @param res Result.
+     * @param dhtVer Dht version.
+     * @param dhtFutId Dht future id.
+     * @param newDhtNodes New DHT nodes involved into transaction.
+     */
+    public GridNearTxEnlistResponse(int cacheId,
+        IgniteUuid futId,
+        int miniId,
+        GridCacheVersion lockVer,
+        GridCacheReturn res,
+        GridCacheVersion dhtVer,
+        IgniteUuid dhtFutId,
+        Set<UUID> newDhtNodes) {
+        this.cacheId = cacheId;
+        this.futId = futId;
+        this.miniId = miniId;
+        this.lockVer = lockVer;
+        this.res = res;
+        this.dhtVer = dhtVer;
+        this.dhtFutId = dhtFutId;
+        this.newDhtNodes = newDhtNodes;
+    }
+
+    /**
+     * Constructor for error result.
+     *
+     * @param cacheId Cache id.
+     * @param futId Future id.
+     * @param miniId Mini future id.
+     * @param lockVer Lock version.
+     * @param err Error.
+     */
+    public GridNearTxEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer,
+        Throwable err) {
+        this.cacheId = cacheId;
+        this.futId = futId;
+        this.miniId = miniId;
+        this.lockVer = lockVer;
+        this.err = err;
+    }
+
+    /**
+     * @return Loc version.
+     */
+    public GridCacheVersion version() {
+        return lockVer;
+    }
+
+    /**
+     * @return Future id.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Mini future id.
+     */
+    public int miniId() {
+        return miniId;
+    }
+
+    /**
+     * @return Result.
+     */
+    public GridCacheReturn result() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Throwable error() {
+        return err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /**
+     * @return Dht version.
+     */
+    public GridCacheVersion dhtVersion() {
+        return dhtVer;
+    }
+
+    /**
+     * @return Dht future id.
+     */
+    public IgniteUuid dhtFutureId() {
+        return dhtFutId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 11;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeIgniteUuid("dhtFutId", dhtFutId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("dhtVer", dhtVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("lockVer", lockVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeInt("miniId", miniId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeCollection("newDhtNodes", newDhtNodes, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeMessage("res", res))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                dhtFutId = reader.readIgniteUuid("dhtFutId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                dhtVer = reader.readMessage("dhtVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                lockVer = reader.readMessage("lockVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                miniId = reader.readInt("miniId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                newDhtNodes = reader.readCollection("newDhtNodes", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                res = reader.readMessage("res");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearTxEnlistResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 160;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        if (err != null && errBytes == null)
+            errBytes = U.marshal(ctx.marshaller(), err);
+
+        if (res != null)
+            res.prepareMarshal(cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        if (errBytes != null)
+            err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+        if (res != null)
+            res.finishUnmarshal(cctx, ldr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearTxEnlistResponse.class, this);
+    }
+}