You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2014/12/23 21:36:13 UTC

[30/53] [abbrv] incubator-ignite git commit: GG-9141 - Renaming.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
new file mode 100644
index 0000000..e0effe0
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
@@ -0,0 +1,1493 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.transactions;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
+import static org.gridgain.grid.kernal.processors.cache.transactions.IgniteTxEx.FinalizationStatus.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+
+/**
+ * Isolated logic to process cache messages.
+ */
+public class IgniteTxHandler<K, V> {
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Shared cache context. */
+    private GridCacheSharedContext<K, V> ctx;
+
+    public IgniteFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId,
+        final GridNearTxPrepareRequest<K, V> req) {
+        return prepareTx(nearNodeId, null, req);
+    }
+
+    /**
+     * @param ctx Shared cache context.
+     */
+    public IgniteTxHandler(GridCacheSharedContext<K, V> ctx) {
+        this.ctx = ctx;
+
+        log = ctx.logger(IgniteTxHandler.class);
+
+        ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+            @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse<K, V>)msg);
+            }
+        });
+
+        ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
+            new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest<K, V>>() {
+                @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
+                    processCheckPreparedTxRequest(nodeId, req);
+                }
+            });
+
+        ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
+            new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse<K, V>>() {
+                @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
+                    processCheckPreparedTxResponse(nodeId, res);
+                }
+            });
+
+        ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxRequest.class,
+            new CI2<UUID, GridCachePessimisticCheckCommittedTxRequest<K, V>>() {
+                @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
+                    processCheckCommittedTxRequest(nodeId, req);
+                }
+            });
+
+        ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxResponse.class,
+            new CI2<UUID, GridCachePessimisticCheckCommittedTxResponse<K, V>>() {
+                @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+                    processCheckCommittedTxResponse(nodeId, res);
+                }
+            });
+    }
+
+    /**
+     * @param nearNodeId Near node ID that initiated transaction.
+     * @param locTx Optional local transaction.
+     * @param req Near prepare request.
+     * @return Future for transaction.
+     */
+    public IgniteFuture<IgniteTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx,
+        final GridNearTxPrepareRequest<K, V> req) {
+        assert nearNodeId != null;
+        assert req != null;
+
+        if (locTx != null) {
+            if (req.near()) {
+                // Make sure not to provide Near entries to DHT cache.
+                req.cloneEntries();
+
+                return prepareNearTx(nearNodeId, req);
+            }
+            else
+                return prepareColocatedTx(locTx, req);
+        }
+        else
+            return prepareNearTx(nearNodeId, req);
+    }
+
+    /**
+     * Prepares local colocated tx.
+     *
+     * @param locTx Local transaction.
+     * @param req Near prepare request.
+     * @return Prepare future.
+     */
+    private IgniteFuture<IgniteTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx,
+        final GridNearTxPrepareRequest<K, V> req) {
+
+        IgniteFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys.
+
+        return new GridEmbeddedFuture<>(
+            ctx.kernalContext(),
+            fut,
+            new C2<Object, Exception, IgniteFuture<IgniteTxEx<K, V>>>() {
+                @Override public IgniteFuture<IgniteTxEx<K, V>> apply(Object o, Exception ex) {
+                    if (ex != null)
+                        throw new GridClosureException(ex);
+
+                    IgniteFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(),
+                        req.transactionNodes(), req.last(), req.lastBackups());
+
+                    if (locTx.isRollbackOnly())
+                        locTx.rollbackAsync();
+
+                    return fut;
+                }
+            },
+            new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() {
+                @Nullable @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> tx, Exception e) {
+                    if (e != null) {
+                        // tx can be null of exception occurred.
+                        if (tx != null)
+                            tx.setRollbackOnly(); // Just in case.
+
+                        if (!(e instanceof IgniteTxOptimisticException))
+                            U.error(log, "Failed to prepare DHT transaction: " + tx, e);
+                    }
+
+                    return tx;
+                }
+            }
+        );
+    }
+
+    /**
+     * Prepares near transaction.
+     *
+     * @param nearNodeId Near node ID that initiated transaction.
+     * @param req Near prepare request.
+     * @return Prepare future.
+     */
+    private IgniteFuture<IgniteTxEx<K, V>> prepareNearTx(final UUID nearNodeId,
+        final GridNearTxPrepareRequest<K, V> req) {
+        ClusterNode nearNode = ctx.node(nearNodeId);
+
+        if (nearNode == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received transaction request from node that left grid (will ignore): " + nearNodeId);
+
+            return null;
+        }
+
+        try {
+            for (IgniteTxEntry<K, V> e : F.concat(false, req.reads(), req.writes()))
+                e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx.kernalContext(), e);
+        }
+
+        GridDhtTxLocal<K, V> tx;
+
+        GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
+
+        if (mappedVer != null) {
+            tx = ctx.tm().tx(mappedVer);
+
+            if (tx == null)
+                U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version()
+                    + ", mappedVer=" + mappedVer + ']');
+        }
+        else {
+            tx = new GridDhtTxLocal<>(
+                ctx,
+                nearNode.id(),
+                req.version(),
+                req.futureId(),
+                req.miniId(),
+                req.threadId(),
+                /*implicit*/false,
+                /*implicit-single*/false,
+                req.system(),
+                req.concurrency(),
+                req.isolation(),
+                req.timeout(),
+                req.isInvalidate(),
+                false,
+                req.txSize(),
+                req.groupLockKey(),
+                req.partitionLock(),
+                req.transactionNodes(),
+                req.subjectId(),
+                req.taskNameHash()
+            );
+
+            tx = ctx.tm().onCreated(tx);
+
+            if (tx != null)
+                tx.topologyVersion(req.topologyVersion());
+            else
+                U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
+                    tx.xid() + ", req=" + req + ']');
+        }
+
+        if (tx != null) {
+            IgniteFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(),
+                req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(),
+                req.lastBackups());
+
+            if (tx.isRollbackOnly()) {
+                try {
+                    tx.rollback();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to rollback transaction: " + tx, e);
+                }
+            }
+
+            final GridDhtTxLocal<K, V> tx0 = tx;
+
+            fut.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() {
+                @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> txFut) {
+                    try {
+                        txFut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        tx0.setRollbackOnly(); // Just in case.
+
+                        if (!(e instanceof IgniteTxOptimisticException))
+                            U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
+                    }
+                }
+            });
+
+            return fut;
+        }
+        else
+            return new GridFinishedFuture<>(ctx.kernalContext(), (IgniteTxEx<K, V>)null);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V> res) {
+        GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)ctx.mvcc()
+            .<IgniteTxEx<K, V>>future(res.version(), res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']');
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse<K, V> res) {
+        ctx.tm().onFinishedRemote(nodeId, res.threadId());
+
+        GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(
+            res.xid(), res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to find future for finish response [sender=" + nodeId + ", res=" + res + ']');
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) {
+        GridDhtTxPrepareFuture<K, V> fut = (GridDhtTxPrepareFuture<K, V>)ctx.mvcc().
+            <IgniteTxEx<K, V>>future(res.version(), res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received response for unknown future (will ignore): " + res);
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void processDhtTxFinishResponse(UUID nodeId, GridDhtTxFinishResponse<K, V> res) {
+        assert nodeId != null;
+        assert res != null;
+
+        GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(res.xid(),
+            res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received response for unknown future (will ignore): " + res);
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     * @return Future.
+     */
+    @Nullable public IgniteFuture<IgniteTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest<K, V> req) {
+        return finish(nodeId, null, req);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     * @return Future.
+     */
+    @Nullable public IgniteFuture<IgniteTx> finish(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
+        GridNearTxFinishRequest<K, V> req) {
+        assert nodeId != null;
+        assert req != null;
+
+        // Transaction on local cache only.
+        if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
+            return new GridFinishedFutureEx<IgniteTx>(locTx);
+
+        if (log.isDebugEnabled())
+            log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]");
+
+        IgniteFuture<IgniteTx> colocatedFinishFut = null;
+
+        if (locTx != null && locTx.colocatedLocallyMapped())
+            colocatedFinishFut = finishColocatedLocal(req.commit(), locTx);
+
+        IgniteFuture<IgniteTx> nearFinishFut = null;
+
+        if (locTx == null || locTx.nearLocallyMapped()) {
+            if (locTx != null)
+                req.cloneEntries();
+
+            nearFinishFut = finishDhtLocal(nodeId, locTx, req);
+        }
+
+        if (colocatedFinishFut != null && nearFinishFut != null) {
+            GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext());
+
+            res.add(colocatedFinishFut);
+            res.add(nearFinishFut);
+
+            res.markInitialized();
+
+            return res;
+        }
+
+        if (colocatedFinishFut != null)
+            return colocatedFinishFut;
+
+        return nearFinishFut;
+    }
+
+    /**
+     * @param nodeId Node ID initiated commit.
+     * @param locTx Optional local transaction.
+     * @param req Finish request.
+     * @return Finish future.
+     */
+    private IgniteFuture<IgniteTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
+        GridNearTxFinishRequest<K, V> req) {
+        GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
+
+        GridDhtTxLocal<K, V> tx = null;
+
+        if (dhtVer == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received transaction finish request for unknown near version (was lock explicit?): " + req);
+        }
+        else
+            tx = ctx.tm().tx(dhtVer);
+
+        if (tx == null && !req.explicitLock()) {
+            assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx;
+
+            U.warn(log, "Received finish request for completed transaction (the message may be too late " +
+                "and transaction could have been DGCed by now) [commit=" + req.commit() +
+                ", xid=" + req.version() + ']');
+
+            // Always send finish response.
+            GridCacheMessage<K, V> res = new GridNearTxFinishResponse<>(req.version(), req.threadId(), req.futureId(),
+                req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+
+            try {
+                ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+            }
+            catch (Throwable e) {
+                // Double-check.
+                if (ctx.discovery().node(nodeId) == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res +
+                            ']');
+                }
+                else
+                    U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
+                        "res=" + res + ']', e);
+            }
+
+            return null;
+        }
+
+        try {
+            if (req.commit()) {
+                if (tx == null) {
+                    // Create transaction and add entries.
+                    tx = ctx.tm().onCreated(
+                        new GridDhtTxLocal<>(
+                            ctx,
+                            nodeId,
+                            req.version(),
+                            req.futureId(),
+                            req.miniId(),
+                            req.threadId(),
+                            true,
+                            false, /* we don't know, so assume false. */
+                            req.system(),
+                            PESSIMISTIC,
+                            READ_COMMITTED,
+                            /*timeout */0,
+                            req.isInvalidate(),
+                            req.storeEnabled(),
+                            req.txSize(),
+                            req.groupLockKey(),
+                            false,
+                            null,
+                            req.subjectId(),
+                            req.taskNameHash()));
+
+                    if (tx == null || !ctx.tm().onStarted(tx))
+                        throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + req);
+
+                    tx.topologyVersion(req.topologyVersion());
+                }
+
+                tx.storeEnabled(req.storeEnabled());
+
+                if (!tx.markFinalizing(USER_FINISH)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+
+                    return null;
+                }
+
+                if (!tx.syncCommit())
+                    tx.syncCommit(req.syncCommit());
+
+                tx.nearFinishFutureId(req.futureId());
+                tx.nearFinishMiniId(req.miniId());
+                tx.recoveryWrites(req.recoveryWrites());
+
+                Collection<IgniteTxEntry<K, V>> writeEntries = req.writes();
+
+                if (!F.isEmpty(writeEntries)) {
+                    // In OPTIMISTIC mode, we get the values at PREPARE stage.
+                    assert tx.concurrency() == PESSIMISTIC;
+
+                    for (IgniteTxEntry<K, V> entry : writeEntries)
+                        tx.addEntry(req.messageId(), entry);
+                }
+
+                if (tx.pessimistic())
+                    tx.prepare();
+
+                IgniteFuture<IgniteTx> commitFut = tx.commitAsync();
+
+                // Only for error logging.
+                commitFut.listenAsync(CU.errorLogger(log));
+
+                return commitFut;
+            }
+            else {
+                assert tx != null : "Transaction is null for near rollback request [nodeId=" +
+                    nodeId + ", req=" + req + "]";
+
+                tx.syncRollback(req.syncRollback());
+
+                tx.nearFinishFutureId(req.futureId());
+                tx.nearFinishMiniId(req.miniId());
+
+                IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
+
+                // Only for error logging.
+                rollbackFut.listenAsync(CU.errorLogger(log));
+
+                return rollbackFut;
+            }
+        }
+        catch (Throwable e) {
+            U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
+
+            if (tx != null) {
+                IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
+
+                // Only for error logging.
+                rollbackFut.listenAsync(CU.errorLogger(log));
+
+                return rollbackFut;
+            }
+
+            return new GridFinishedFuture<>(ctx.kernalContext(), e);
+        }
+    }
+
+    /**
+     * @param commit Commit flag (rollback if {@code false}).
+     * @param tx Transaction to commit.
+     * @return Future.
+     */
+    public IgniteFuture<IgniteTx> finishColocatedLocal(boolean commit, GridNearTxLocal<K, V> tx) {
+        try {
+            if (commit) {
+                if (!tx.markFinalizing(USER_FINISH)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+
+                    return null;
+                }
+
+                return tx.commitAsyncLocal();
+            }
+            else
+                return tx.rollbackAsyncLocal();
+        }
+        catch (Throwable e) {
+            U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e);
+
+            if (tx != null)
+                return tx.rollbackAsync();
+
+            return new GridFinishedFuture<>(ctx.kernalContext(), e);
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param req Request.
+     */
+    protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest<K, V> req) {
+        assert nodeId != null;
+        assert req != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() +
+                ", nodeId=" + nodeId + ", req=" + req + ']');
+
+        GridDhtTxRemote<K, V> dhtTx = null;
+        GridNearTxRemote<K, V> nearTx = null;
+
+        GridDhtTxPrepareResponse<K, V> res;
+
+        try {
+            res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId());
+
+            // Start near transaction first.
+            nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
+            dhtTx = startRemoteTx(nodeId, req, res);
+
+            // Set evicted keys from near transaction.
+            if (nearTx != null)
+                res.nearEvicted(nearTx.evicted());
+
+            if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
+                res.invalidPartitions(dhtTx.invalidPartitions());
+        }
+        catch (IgniteCheckedException e) {
+            if (e instanceof IgniteTxRollbackException)
+                U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e);
+            else if (e instanceof IgniteTxOptimisticException) {
+                if (log.isDebugEnabled())
+                    log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx);
+            }
+            else
+                U.error(log, "Failed to process prepare request: " + req, e);
+
+            if (nearTx != null)
+                nearTx.rollback();
+
+            if (dhtTx != null)
+                dhtTx.rollback();
+
+            res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId(), e);
+        }
+
+        try {
+            // Reply back to sender.
+            ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            if (e instanceof ClusterTopologyException) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send tx response to remote node (node left grid) [node=" + nodeId +
+                        ", xid=" + req.version());
+            }
+            else
+                U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [node=" + nodeId +
+                    ", xid=" + req.version() + ", err=" +  e.getMessage() + ']');
+
+            if (nearTx != null)
+                nearTx.rollback();
+
+            if (dhtTx != null)
+                dhtTx.rollback();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    @SuppressWarnings({"unchecked"})
+    protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest<K, V> req) {
+        assert nodeId != null;
+        assert req != null;
+
+        if (log.isDebugEnabled())
+            log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
+
+        GridDhtTxRemote<K, V> dhtTx = ctx.tm().tx(req.version());
+        GridNearTxRemote<K, V> nearTx = ctx.tm().nearTx(req.version());
+
+        try {
+            if (dhtTx == null && !F.isEmpty(req.writes()))
+                dhtTx = startRemoteTxForFinish(nodeId, req);
+
+            if (dhtTx != null) {
+                dhtTx.syncCommit(req.syncCommit());
+                dhtTx.syncRollback(req.syncRollback());
+            }
+
+            // One-phase commit transactions send finish requests to backup nodes.
+            if (dhtTx != null && req.onePhaseCommit()) {
+                dhtTx.onePhaseCommit(true);
+
+                dhtTx.writeVersion(req.writeVersion());
+            }
+
+            if (nearTx == null && !F.isEmpty(req.nearWrites()) && req.groupLock())
+                nearTx = startNearRemoteTxForFinish(nodeId, req);
+
+            if (nearTx != null) {
+                nearTx.syncCommit(req.syncCommit());
+                nearTx.syncRollback(req.syncRollback());
+            }
+        }
+        catch (IgniteTxRollbackException e) {
+            if (log.isDebugEnabled())
+                log.debug("Received finish request for completed transaction (will ignore) [req=" + req + ", err=" +
+                    e.getMessage() + ']');
+
+            sendReply(nodeId, req);
+
+            return;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to start remote DHT and Near transactions (will invalidate transactions) [dhtTx=" +
+                dhtTx + ", nearTx=" + nearTx + ']', e);
+
+            if (dhtTx != null)
+                dhtTx.invalidate(true);
+
+            if (nearTx != null)
+                nearTx.invalidate(true);
+        }
+        catch (GridDistributedLockCancelledException ignore) {
+            U.warn(log, "Received commit request to cancelled lock (will invalidate transaction) [dhtTx=" +
+                dhtTx + ", nearTx=" + nearTx + ']');
+
+            if (dhtTx != null)
+                dhtTx.invalidate(true);
+
+            if (nearTx != null)
+                nearTx.invalidate(true);
+        }
+
+        // Safety - local transaction will finish explicitly.
+        if (nearTx != null && nearTx.local())
+            nearTx = null;
+
+        finish(nodeId, dhtTx, req, req.writes());
+
+        if (nearTx != null)
+            finish(nodeId, nearTx, req, req.nearWrites());
+
+        sendReply(nodeId, req);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param tx Transaction.
+     * @param req Request.
+     * @param writes Writes.
+     */
+    protected void finish(
+        UUID nodeId,
+        IgniteTxRemoteEx<K, V> tx,
+        GridDhtTxFinishRequest<K, V> req,
+        Collection<IgniteTxEntry<K, V>> writes) {
+        // We don't allow explicit locks for transactions and
+        // therefore immediately return if transaction is null.
+        // However, we may decide to relax this restriction in
+        // future.
+        if (tx == null) {
+            if (req.commit())
+                // Must be some long time duplicate, but we add it anyway.
+                ctx.tm().addCommittedTx(req.version(), null);
+            else
+                ctx.tm().addRolledbackTx(req.version());
+
+            if (log.isDebugEnabled())
+                log.debug("Received finish request for non-existing transaction (added to completed set) " +
+                    "[senderNodeId=" + nodeId + ", res=" + req + ']');
+
+            return;
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req +
+                ", tx=" + tx + ']');
+
+        try {
+            if (req.commit() || req.isSystemInvalidate()) {
+                if (tx.commitVersion(req.commitVersion())) {
+                    tx.invalidate(req.isInvalidate());
+                    tx.systemInvalidate(req.isSystemInvalidate());
+
+                    if (!F.isEmpty(writes)) {
+                        // In OPTIMISTIC mode, we get the values at PREPARE stage.
+                        assert tx.concurrency() == PESSIMISTIC;
+
+                        for (IgniteTxEntry<K, V> entry : writes) {
+                            if (log.isDebugEnabled())
+                                log.debug("Unmarshalled transaction entry from pessimistic transaction [key=" +
+                                    entry.key() + ", value=" + entry.value() + ", tx=" + tx + ']');
+
+                            if (!tx.setWriteValue(entry))
+                                U.warn(log, "Received entry to commit that was not present in transaction [entry=" +
+                                    entry + ", tx=" + tx + ']');
+                        }
+                    }
+
+                    // Complete remote candidates.
+                    tx.doneRemote(req.baseVersion(), null, null, null);
+
+                    if (tx.pessimistic())
+                        tx.prepare();
+
+                    tx.commit();
+                }
+            }
+            else {
+                assert tx != null;
+
+                tx.doneRemote(req.baseVersion(), null, null, null);
+
+                tx.rollback();
+            }
+        }
+        catch (Throwable e) {
+            U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
+
+            // Mark transaction for invalidate.
+            tx.invalidate(true);
+            tx.systemInvalidate(true);
+
+            try {
+                tx.commit();
+            }
+            catch (IgniteCheckedException ex) {
+                U.error(log, "Failed to invalidate transaction: " + tx, ex);
+            }
+        }
+    }
+
+    /**
+     * Sends tx finish response to remote node, if response is requested.
+     *
+     * @param nodeId Node id that originated finish request.
+     * @param req Request.
+     */
+    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req) {
+        if (req.replyRequired()) {
+            GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
+
+            try {
+                ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+            }
+            catch (Throwable e) {
+                // Double-check.
+                if (ctx.discovery().node(nodeId) == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
+                }
+                else
+                    U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     * @param res Response.
+     * @return Remote transaction.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId,
+        GridDhtTxPrepareRequest<K, V> req,
+        GridDhtTxPrepareResponse<K, V> res) throws IgniteCheckedException {
+        if (!F.isEmpty(req.writes())) {
+            GridDhtTxRemote<K, V> tx = ctx.tm().tx(req.version());
+
+            assert F.isEmpty(req.candidatesByKey());
+
+            if (tx == null) {
+                tx = new GridDhtTxRemote<>(
+                    ctx,
+                    req.nearNodeId(),
+                    req.futureId(),
+                    nodeId,
+                    req.threadId(),
+                    req.topologyVersion(),
+                    req.version(),
+                    req.commitVersion(),
+                    req.system(),
+                    req.concurrency(),
+                    req.isolation(),
+                    req.isInvalidate(),
+                    req.timeout(),
+                    req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
+                    req.groupLockKey(),
+                    req.nearXidVersion(),
+                    req.transactionNodes(),
+                    req.subjectId(),
+                    req.taskNameHash());
+
+                tx = ctx.tm().onCreated(tx);
+
+                if (tx == null || !ctx.tm().onStarted(tx)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Attempt to start a completed transaction (will ignore): " + tx);
+
+                    return null;
+                }
+            }
+
+            if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {
+                int idx = 0;
+
+                for (IgniteTxEntry<K, V> entry : req.writes()) {
+                    GridCacheContext<K, V> cacheCtx = entry.context();
+
+                    tx.addWrite(entry, ctx.deploy().globalLoader());
+
+                    if (isNearEnabled(cacheCtx) && req.invalidateNearEntry(idx))
+                        invalidateNearEntry(cacheCtx, entry.key(), req.version());
+
+                    try {
+                        if (req.needPreloadKey(idx)) {
+                            GridCacheEntryEx<K, V> cached = entry.cached();
+
+                            if (cached == null)
+                                cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+
+                            GridCacheEntryInfo<K, V> info = cached.info();
+
+                            if (info != null && !info.isNew() && !info.isDeleted())
+                                res.addPreloadEntry(info);
+                        }
+                    }
+                    catch (GridDhtInvalidPartitionException e) {
+                        tx.addInvalidPartition(cacheCtx, e.partition());
+
+                        tx.clearEntry(entry.txKey());
+                    }
+
+                    idx++;
+                }
+            }
+
+            // Prepare prior to reordering, so the pending locks added
+            // in prepare phase will get properly ordered as well.
+            tx.prepare();
+
+            if (req.last())
+                tx.state(PREPARED);
+
+            res.invalidPartitions(tx.invalidPartitions());
+
+            if (tx.empty() && req.last()) {
+                tx.rollback();
+
+                return null;
+            }
+
+            return tx;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param key Key
+     * @param ver Version.
+     * @throws IgniteCheckedException If invalidate failed.
+     */
+    private void invalidateNearEntry(GridCacheContext<K, V> cacheCtx, K key, GridCacheVersion ver)
+        throws IgniteCheckedException {
+        GridNearCacheAdapter<K, V> near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
+
+        GridCacheEntryEx<K, V> nearEntry = near.peekEx(key);
+
+        if (nearEntry != null)
+            nearEntry.invalidate(null, ver);
+    }
+
+    /**
+     * Called while processing dht tx prepare request.
+     *
+     * @param ldr Loader.
+     * @param nodeId Sender node ID.
+     * @param req Request.
+     * @return Remote transaction.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public GridNearTxRemote<K, V> startNearRemoteTx(ClassLoader ldr, UUID nodeId,
+        GridDhtTxPrepareRequest<K, V> req) throws IgniteCheckedException {
+        assert F.isEmpty(req.candidatesByKey());
+
+        if (!F.isEmpty(req.nearWrites())) {
+            GridNearTxRemote<K, V> tx = ctx.tm().nearTx(req.version());
+
+            if (tx == null) {
+                tx = new GridNearTxRemote<>(
+                    ctx,
+                    ldr,
+                    nodeId,
+                    req.nearNodeId(),
+                    req.threadId(),
+                    req.version(),
+                    req.commitVersion(),
+                    req.system(),
+                    req.concurrency(),
+                    req.isolation(),
+                    req.isInvalidate(),
+                    req.timeout(),
+                    req.nearWrites(),
+                    req.txSize(),
+                    req.groupLockKey(),
+                    req.subjectId(),
+                    req.taskNameHash()
+                );
+
+                if (!tx.empty()) {
+                    tx = ctx.tm().onCreated(tx);
+
+                    if (tx == null || !ctx.tm().onStarted(tx))
+                        throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + tx);
+                }
+            }
+            else
+                tx.addEntries(ldr, req.nearWrites());
+
+            tx.ownedVersions(req.owned());
+
+            // Prepare prior to reordering, so the pending locks added
+            // in prepare phase will get properly ordered as well.
+            tx.prepare();
+
+            return tx;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param nodeId Primary node ID.
+     * @param req Request.
+     * @return Remote transaction.
+     * @throws IgniteCheckedException If failed.
+     * @throws GridDistributedLockCancelledException If lock has been cancelled.
+     */
+    @SuppressWarnings({"RedundantTypeArguments"})
+    @Nullable GridDhtTxRemote<K, V> startRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
+        throws IgniteCheckedException, GridDistributedLockCancelledException {
+
+        GridDhtTxRemote<K, V> tx = null;
+
+        boolean marked = false;
+
+        for (IgniteTxEntry<K, V> txEntry : req.writes()) {
+            GridDistributedCacheEntry<K, V> entry = null;
+
+            GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+            while (true) {
+                try {
+                    int part = cacheCtx.affinity().partition(txEntry.key());
+
+                    GridDhtLocalPartition<K, V> locPart = cacheCtx.topology().localPartition(part,
+                        req.topologyVersion(), false);
+
+                    // Handle implicit locks for pessimistic transactions.
+                    if (tx == null)
+                        tx = ctx.tm().tx(req.version());
+
+                    if (locPart == null || !locPart.reserve()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Local partition for given key is already evicted (will remove from tx) " +
+                                "[key=" + txEntry.key() + ", part=" + part + ", locPart=" + locPart + ']');
+
+                        if (tx != null)
+                            tx.clearEntry(txEntry.txKey());
+
+                        break;
+                    }
+
+                    try {
+                        entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key(),
+                            req.topologyVersion());
+
+                        if (tx == null) {
+                            tx = new GridDhtTxRemote<>(
+                                ctx,
+                                req.nearNodeId(),
+                                req.futureId(),
+                                nodeId,
+                                // We can pass null as nearXidVersion as transaction will be committed right away.
+                                null,
+                                req.threadId(),
+                                req.topologyVersion(),
+                                req.version(),
+                                /*commitVer*/null,
+                                req.system(),
+                                PESSIMISTIC,
+                                req.isolation(),
+                                req.isInvalidate(),
+                                0,
+                                req.txSize(),
+                                req.groupLockKey(),
+                                req.subjectId(),
+                                req.taskNameHash());
+
+                            tx = ctx.tm().onCreated(tx);
+
+                            if (tx == null || !ctx.tm().onStarted(tx))
+                                throw new IgniteTxRollbackException("Failed to acquire lock " +
+                                    "(transaction has been completed): " + req.version());
+                        }
+
+                        tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(),
+                            txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion());
+
+                        if (!marked) {
+                            if (tx.markFinalizing(USER_FINISH))
+                                marked = true;
+                            else {
+                                tx.clearEntry(txEntry.txKey());
+
+                                return null;
+                            }
+                        }
+
+                        // Add remote candidate before reordering.
+                        if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
+                            entry.addRemote(
+                                req.nearNodeId(),
+                                nodeId,
+                                req.threadId(),
+                                req.version(),
+                                0,
+                                /*tx*/true,
+                                tx.implicitSingle(),
+                                null
+                            );
+
+                        // Double-check in case if sender node left the grid.
+                        if (ctx.discovery().node(req.nearNodeId()) == null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
+
+                            tx.rollback();
+
+                            return null;
+                        }
+
+                        // Entry is legit.
+                        break;
+                    }
+                    finally {
+                        locPart.release();
+                    }
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
+                        entry;
+
+                    if (log.isDebugEnabled())
+                        log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
+
+                    tx.clearEntry(txEntry.txKey());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
+                            entry + ", tx=" + tx + ']');
+                }
+                catch (GridDhtInvalidPartitionException p) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received invalid partition (will clear entry from tx) [part=" + p + ", req=" +
+                            req + ", txEntry=" + txEntry + ']');
+
+                    if (tx != null)
+                        tx.clearEntry(txEntry.txKey());
+
+                    break;
+                }
+            }
+        }
+
+        if (tx != null && tx.empty()) {
+            tx.rollback();
+
+            return null;
+        }
+
+        return tx;
+    }
+
+    /**
+     * @param nodeId Primary node ID.
+     * @param req Request.
+     * @return Remote transaction.
+     * @throws IgniteCheckedException If failed.
+     * @throws GridDistributedLockCancelledException If lock has been cancelled.
+     */
+    @SuppressWarnings({"RedundantTypeArguments"})
+    @Nullable public GridNearTxRemote<K, V> startNearRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
+        throws IgniteCheckedException, GridDistributedLockCancelledException {
+        assert req.groupLock();
+
+        GridNearTxRemote<K, V> tx = null;
+
+        ClassLoader ldr = ctx.deploy().globalLoader();
+
+        if (ldr != null) {
+            boolean marked = false;
+
+            for (IgniteTxEntry<K, V> txEntry : req.nearWrites()) {
+                GridDistributedCacheEntry<K, V> entry = null;
+
+                GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+                while (true) {
+                    try {
+                        entry = cacheCtx.near().peekExx(txEntry.key());
+
+                        if (entry != null) {
+                            entry.keyBytes(txEntry.keyBytes());
+
+                            // Handle implicit locks for pessimistic transactions.
+                            if (tx == null)
+                                tx = ctx.tm().nearTx(req.version());
+
+                            if (tx == null) {
+                                tx = new GridNearTxRemote<>(
+                                    ctx,
+                                    nodeId,
+                                    req.nearNodeId(),
+                                    // We can pass null as nearXidVer as transaction will be committed right away.
+                                    null,
+                                    req.threadId(),
+                                    req.version(),
+                                    null,
+                                    req.system(),
+                                    PESSIMISTIC,
+                                    req.isolation(),
+                                    req.isInvalidate(),
+                                    0,
+                                    req.txSize(),
+                                    req.groupLockKey(),
+                                    req.subjectId(),
+                                    req.taskNameHash());
+
+                                tx = ctx.tm().onCreated(tx);
+
+                                if (tx == null || !ctx.tm().onStarted(tx))
+                                    throw new IgniteTxRollbackException("Failed to acquire lock " +
+                                        "(transaction has been completed): " + req.version());
+
+                                if (!marked)
+                                    marked = tx.markFinalizing(USER_FINISH);
+
+                                if (!marked)
+                                    return null;
+                            }
+
+                            if (tx.local())
+                                return null;
+
+                            if (!marked)
+                                marked = tx.markFinalizing(USER_FINISH);
+
+                            if (marked)
+                                tx.addEntry(cacheCtx, txEntry.txKey(), txEntry.keyBytes(), txEntry.op(), txEntry.value(),
+                                    txEntry.valueBytes(), txEntry.drVersion());
+                            else
+                                return null;
+
+                            if (req.groupLock()) {
+                                tx.markGroupLock();
+
+                                if (!txEntry.groupLockEntry())
+                                    tx.groupLockKey(txEntry.txKey());
+                            }
+
+                            // Add remote candidate before reordering.
+                            if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
+                                entry.addRemote(
+                                    req.nearNodeId(),
+                                    nodeId,
+                                    req.threadId(),
+                                    req.version(),
+                                    0,
+                                    /*tx*/true,
+                                    tx.implicitSingle(),
+                                    null
+                                );
+                        }
+
+                        // Double-check in case if sender node left the grid.
+                        if (ctx.discovery().node(req.nearNodeId()) == null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
+
+                            if (tx != null)
+                                tx.rollback();
+
+                            return null;
+                        }
+
+                        // Entry is legit.
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
+                            entry;
+
+                        if (log.isDebugEnabled())
+                            log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
+
+                        if (tx != null) {
+                            tx.clearEntry(txEntry.txKey());
+
+                            if (log.isDebugEnabled())
+                                log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
+                                    entry + ", tx=" + tx + ']');
+                        }
+
+                        // Will retry in while loop.
+                    }
+                }
+            }
+        }
+        else {
+            String err = "Failed to acquire deployment class loader for message: " + req;
+
+            U.warn(log, err);
+
+            throw new IgniteCheckedException(err);
+        }
+
+        return tx;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
+        if (log.isDebugEnabled())
+            log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
+
+        boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+
+        GridCacheOptimisticCheckPreparedTxResponse<K, V> res =
+            new GridCacheOptimisticCheckPreparedTxResponse<>(req.version(), req.futureId(), req.miniId(), prepared);
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+            ctx.io().send(nodeId, res);
+        }
+        catch (ClusterTopologyException ignored) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" +
+                    nodeId + ", res=" + res + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
+        if (log.isDebugEnabled())
+            log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+        GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = (GridCacheOptimisticCheckPreparedTxFuture<K, V>)ctx.mvcc().
+            <Boolean>future(res.version(), res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received response for unknown future (will ignore): " + res);
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     */
+    protected void processCheckCommittedTxRequest(final UUID nodeId,
+        final GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
+        if (log.isDebugEnabled())
+            log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']');
+
+        IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req);
+
+        infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() {
+            @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) {
+                GridCacheCommittedTxInfo<K, V> info = null;
+
+                try {
+                    info = infoFut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to obtain committed info for transaction (will rollback): " + req, e);
+                }
+
+                GridCachePessimisticCheckCommittedTxResponse<K, V>
+                    res = new GridCachePessimisticCheckCommittedTxResponse<>(
+                    req.version(), req.futureId(), req.miniId(), info);
+
+                if (log.isDebugEnabled())
+                    log.debug("Finished waiting for tx committed info [req=" + req + ", res=" + res + ']');
+
+                sendCheckCommittedResponse(nodeId, res);            }
+        });
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    protected void processCheckCommittedTxResponse(UUID nodeId,
+        GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+        if (log.isDebugEnabled())
+            log.debug("Processing check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+        GridCachePessimisticCheckCommittedTxFuture<K, V> fut =
+            (GridCachePessimisticCheckCommittedTxFuture<K, V>)ctx.mvcc().<GridCacheCommittedTxInfo<K, V>>future(
+                res.version(), res.futureId());
+
+        if (fut == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received response for unknown future (will ignore): " + res);
+
+            return;
+        }
+
+        fut.onResult(nodeId, res);
+    }
+
+    /**
+     * Sends check committed response to remote node.
+     *
+     * @param nodeId Node ID to send to.
+     * @param res Reponse to send.
+     */
+    private void sendCheckCommittedResponse(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+            ctx.io().send(nodeId, res);
+        }
+        catch (ClusterTopologyException ignored) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" +
+                    nodeId + ", res=" + res + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxKey.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxKey.java
new file mode 100644
index 0000000..e2950c0
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxKey.java
@@ -0,0 +1,97 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.transactions;
+
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Cache transaction key. This wrapper is needed because same keys may be enlisted in the same transaction
+ * for multiple caches.
+ */
+public class IgniteTxKey<K> implements Externalizable {
+    /** Key. */
+    @GridToStringInclude
+    private K key;
+
+    /** Cache ID. */
+    private int cacheId;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public IgniteTxKey() {
+        // No-op.
+    }
+
+    /**
+     * @param key User key.
+     * @param cacheId Cache ID.
+     */
+    public IgniteTxKey(K key, int cacheId) {
+        this.key = key;
+        this.cacheId = cacheId;
+    }
+
+    /**
+     * @return User key.
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    public int cacheId() {
+        return cacheId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof IgniteTxKey))
+            return false;
+
+        IgniteTxKey that = (IgniteTxKey)o;
+
+        return cacheId == that.cacheId && key.equals(that.key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = key.hashCode();
+
+        res = 31 * res + cacheId;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cacheId);
+        out.writeObject(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        cacheId = in.readInt();
+        key = (K)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteTxKey.class, this);
+    }
+}