You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/22 11:42:47 UTC

[45/50] [abbrv] incubator-ignite git commit: # ignite-41

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f537940c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
index 0000000,e0effe0..2254ddb
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
@@@ -1,0 -1,1493 +1,1504 @@@
+ /* @java.file.header */
+ 
+ /*  _________        _____ __________________        _____
+  *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+  *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+  *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+  *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+  */
+ 
+ package org.gridgain.grid.kernal.processors.cache.transactions;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.transactions.*;
+ import org.gridgain.grid.kernal.processors.cache.*;
+ import org.gridgain.grid.kernal.processors.cache.distributed.*;
+ import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+ import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
++import org.gridgain.grid.util.*;
+ import org.gridgain.grid.util.future.*;
+ import org.gridgain.grid.util.lang.*;
+ import org.gridgain.grid.util.typedef.*;
+ import org.gridgain.grid.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.util.*;
+ 
+ import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+ import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+ import static org.apache.ignite.transactions.IgniteTxState.*;
+ import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
+ import static org.gridgain.grid.kernal.processors.cache.transactions.IgniteTxEx.FinalizationStatus.*;
+ import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+ 
+ /**
+  * Isolated logic to process cache messages.
+  */
+ public class IgniteTxHandler<K, V> {
+     /** Logger. */
+     private IgniteLogger log;
+ 
+     /** Shared cache context. */
+     private GridCacheSharedContext<K, V> ctx;
+ 
+     public IgniteFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId,
+         final GridNearTxPrepareRequest<K, V> req) {
+         return prepareTx(nearNodeId, null, req);
+     }
+ 
+     /**
+      * @param ctx Shared cache context.
+      */
+     public IgniteTxHandler(GridCacheSharedContext<K, V> ctx) {
+         this.ctx = ctx;
+ 
+         log = ctx.logger(IgniteTxHandler.class);
+ 
+         ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+             @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+                 processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse<K, V>)msg);
+             }
+         });
+ 
+         ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
+             new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest<K, V>>() {
+                 @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
+                     processCheckPreparedTxRequest(nodeId, req);
+                 }
+             });
+ 
+         ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
+             new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse<K, V>>() {
+                 @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
+                     processCheckPreparedTxResponse(nodeId, res);
+                 }
+             });
+ 
+         ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxRequest.class,
+             new CI2<UUID, GridCachePessimisticCheckCommittedTxRequest<K, V>>() {
+                 @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
+                     processCheckCommittedTxRequest(nodeId, req);
+                 }
+             });
+ 
+         ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxResponse.class,
+             new CI2<UUID, GridCachePessimisticCheckCommittedTxResponse<K, V>>() {
+                 @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+                     processCheckCommittedTxResponse(nodeId, res);
+                 }
+             });
+     }
+ 
+     /**
+      * @param nearNodeId Near node ID that initiated transaction.
+      * @param locTx Optional local transaction.
+      * @param req Near prepare request.
+      * @return Future for transaction.
+      */
+     public IgniteFuture<IgniteTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx,
+         final GridNearTxPrepareRequest<K, V> req) {
+         assert nearNodeId != null;
+         assert req != null;
+ 
+         if (locTx != null) {
+             if (req.near()) {
+                 // Make sure not to provide Near entries to DHT cache.
+                 req.cloneEntries();
+ 
+                 return prepareNearTx(nearNodeId, req);
+             }
+             else
+                 return prepareColocatedTx(locTx, req);
+         }
+         else
+             return prepareNearTx(nearNodeId, req);
+     }
+ 
+     /**
+      * Prepares local colocated tx.
+      *
+      * @param locTx Local transaction.
+      * @param req Near prepare request.
+      * @return Prepare future.
+      */
+     private IgniteFuture<IgniteTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx,
+         final GridNearTxPrepareRequest<K, V> req) {
+ 
+         IgniteFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys.
+ 
+         return new GridEmbeddedFuture<>(
+             ctx.kernalContext(),
+             fut,
+             new C2<Object, Exception, IgniteFuture<IgniteTxEx<K, V>>>() {
+                 @Override public IgniteFuture<IgniteTxEx<K, V>> apply(Object o, Exception ex) {
+                     if (ex != null)
+                         throw new GridClosureException(ex);
+ 
+                     IgniteFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(),
+                         req.transactionNodes(), req.last(), req.lastBackups());
+ 
+                     if (locTx.isRollbackOnly())
+                         locTx.rollbackAsync();
+ 
+                     return fut;
+                 }
+             },
+             new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() {
+                 @Nullable @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> tx, Exception e) {
+                     if (e != null) {
+                         // tx can be null of exception occurred.
+                         if (tx != null)
+                             tx.setRollbackOnly(); // Just in case.
+ 
+                         if (!(e instanceof IgniteTxOptimisticException))
+                             U.error(log, "Failed to prepare DHT transaction: " + tx, e);
+                     }
+ 
+                     return tx;
+                 }
+             }
+         );
+     }
+ 
+     /**
+      * Prepares near transaction.
+      *
+      * @param nearNodeId Near node ID that initiated transaction.
+      * @param req Near prepare request.
+      * @return Prepare future.
+      */
+     private IgniteFuture<IgniteTxEx<K, V>> prepareNearTx(final UUID nearNodeId,
+         final GridNearTxPrepareRequest<K, V> req) {
+         ClusterNode nearNode = ctx.node(nearNodeId);
+ 
+         if (nearNode == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Received transaction request from node that left grid (will ignore): " + nearNodeId);
+ 
+             return null;
+         }
+ 
+         try {
+             for (IgniteTxEntry<K, V> e : F.concat(false, req.reads(), req.writes()))
+                 e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+         }
+         catch (IgniteCheckedException e) {
+             return new GridFinishedFuture<>(ctx.kernalContext(), e);
+         }
+ 
+         GridDhtTxLocal<K, V> tx;
+ 
+         GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
+ 
+         if (mappedVer != null) {
+             tx = ctx.tm().tx(mappedVer);
+ 
+             if (tx == null)
+                 U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version()
+                     + ", mappedVer=" + mappedVer + ']');
+         }
+         else {
+             tx = new GridDhtTxLocal<>(
+                 ctx,
+                 nearNode.id(),
+                 req.version(),
+                 req.futureId(),
+                 req.miniId(),
+                 req.threadId(),
+                 /*implicit*/false,
+                 /*implicit-single*/false,
+                 req.system(),
+                 req.concurrency(),
+                 req.isolation(),
+                 req.timeout(),
+                 req.isInvalidate(),
+                 false,
+                 req.txSize(),
+                 req.groupLockKey(),
+                 req.partitionLock(),
+                 req.transactionNodes(),
+                 req.subjectId(),
+                 req.taskNameHash()
+             );
+ 
+             tx = ctx.tm().onCreated(tx);
+ 
+             if (tx != null)
+                 tx.topologyVersion(req.topologyVersion());
+             else
+                 U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
+                     tx.xid() + ", req=" + req + ']');
+         }
+ 
+         if (tx != null) {
+             IgniteFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(),
+                 req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(),
+                 req.lastBackups());
+ 
+             if (tx.isRollbackOnly()) {
+                 try {
+                     tx.rollback();
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to rollback transaction: " + tx, e);
+                 }
+             }
+ 
+             final GridDhtTxLocal<K, V> tx0 = tx;
+ 
+             fut.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() {
+                 @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> txFut) {
+                     try {
+                         txFut.get();
+                     }
+                     catch (IgniteCheckedException e) {
+                         tx0.setRollbackOnly(); // Just in case.
+ 
+                         if (!(e instanceof IgniteTxOptimisticException))
+                             U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
+                     }
+                 }
+             });
+ 
+             return fut;
+         }
+         else
+             return new GridFinishedFuture<>(ctx.kernalContext(), (IgniteTxEx<K, V>)null);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V> res) {
+         GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)ctx.mvcc()
+             .<IgniteTxEx<K, V>>future(res.version(), res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']');
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse<K, V> res) {
+         ctx.tm().onFinishedRemote(nodeId, res.threadId());
+ 
+         GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(
+             res.xid(), res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to find future for finish response [sender=" + nodeId + ", res=" + res + ']');
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) {
+         GridDhtTxPrepareFuture<K, V> fut = (GridDhtTxPrepareFuture<K, V>)ctx.mvcc().
+             <IgniteTxEx<K, V>>future(res.version(), res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Received response for unknown future (will ignore): " + res);
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     private void processDhtTxFinishResponse(UUID nodeId, GridDhtTxFinishResponse<K, V> res) {
+         assert nodeId != null;
+         assert res != null;
+ 
+         GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(res.xid(),
+             res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Received response for unknown future (will ignore): " + res);
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param req Request.
+      * @return Future.
+      */
+     @Nullable public IgniteFuture<IgniteTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest<K, V> req) {
+         return finish(nodeId, null, req);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param req Request.
+      * @return Future.
+      */
+     @Nullable public IgniteFuture<IgniteTx> finish(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
+         GridNearTxFinishRequest<K, V> req) {
+         assert nodeId != null;
+         assert req != null;
+ 
+         // Transaction on local cache only.
+         if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
+             return new GridFinishedFutureEx<IgniteTx>(locTx);
+ 
+         if (log.isDebugEnabled())
+             log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]");
+ 
+         IgniteFuture<IgniteTx> colocatedFinishFut = null;
+ 
+         if (locTx != null && locTx.colocatedLocallyMapped())
+             colocatedFinishFut = finishColocatedLocal(req.commit(), locTx);
+ 
+         IgniteFuture<IgniteTx> nearFinishFut = null;
+ 
+         if (locTx == null || locTx.nearLocallyMapped()) {
+             if (locTx != null)
+                 req.cloneEntries();
+ 
+             nearFinishFut = finishDhtLocal(nodeId, locTx, req);
+         }
+ 
+         if (colocatedFinishFut != null && nearFinishFut != null) {
+             GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext());
+ 
+             res.add(colocatedFinishFut);
+             res.add(nearFinishFut);
+ 
+             res.markInitialized();
+ 
+             return res;
+         }
+ 
+         if (colocatedFinishFut != null)
+             return colocatedFinishFut;
+ 
+         return nearFinishFut;
+     }
+ 
+     /**
+      * @param nodeId Node ID initiated commit.
+      * @param locTx Optional local transaction.
+      * @param req Finish request.
+      * @return Finish future.
+      */
+     private IgniteFuture<IgniteTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
+         GridNearTxFinishRequest<K, V> req) {
+         GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
+ 
+         GridDhtTxLocal<K, V> tx = null;
+ 
+         if (dhtVer == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Received transaction finish request for unknown near version (was lock explicit?): " + req);
+         }
+         else
+             tx = ctx.tm().tx(dhtVer);
+ 
+         if (tx == null && !req.explicitLock()) {
+             assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx;
+ 
+             U.warn(log, "Received finish request for completed transaction (the message may be too late " +
+                 "and transaction could have been DGCed by now) [commit=" + req.commit() +
+                 ", xid=" + req.version() + ']');
+ 
+             // Always send finish response.
+             GridCacheMessage<K, V> res = new GridNearTxFinishResponse<>(req.version(), req.threadId(), req.futureId(),
+                 req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+ 
+             try {
+                 ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+             }
+             catch (Throwable e) {
+                 // Double-check.
+                 if (ctx.discovery().node(nodeId) == null) {
+                     if (log.isDebugEnabled())
+                         log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res +
+                             ']');
+                 }
+                 else
+                     U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
+                         "res=" + res + ']', e);
+             }
+ 
+             return null;
+         }
+ 
+         try {
+             if (req.commit()) {
+                 if (tx == null) {
+                     // Create transaction and add entries.
+                     tx = ctx.tm().onCreated(
+                         new GridDhtTxLocal<>(
+                             ctx,
+                             nodeId,
+                             req.version(),
+                             req.futureId(),
+                             req.miniId(),
+                             req.threadId(),
+                             true,
+                             false, /* we don't know, so assume false. */
+                             req.system(),
+                             PESSIMISTIC,
+                             READ_COMMITTED,
+                             /*timeout */0,
+                             req.isInvalidate(),
+                             req.storeEnabled(),
+                             req.txSize(),
+                             req.groupLockKey(),
+                             false,
+                             null,
+                             req.subjectId(),
+                             req.taskNameHash()));
+ 
+                     if (tx == null || !ctx.tm().onStarted(tx))
+                         throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + req);
+ 
+                     tx.topologyVersion(req.topologyVersion());
+                 }
+ 
+                 tx.storeEnabled(req.storeEnabled());
+ 
+                 if (!tx.markFinalizing(USER_FINISH)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+ 
+                     return null;
+                 }
+ 
+                 if (!tx.syncCommit())
+                     tx.syncCommit(req.syncCommit());
+ 
+                 tx.nearFinishFutureId(req.futureId());
+                 tx.nearFinishMiniId(req.miniId());
+                 tx.recoveryWrites(req.recoveryWrites());
+ 
+                 Collection<IgniteTxEntry<K, V>> writeEntries = req.writes();
+ 
+                 if (!F.isEmpty(writeEntries)) {
+                     // In OPTIMISTIC mode, we get the values at PREPARE stage.
+                     assert tx.concurrency() == PESSIMISTIC;
+ 
+                     for (IgniteTxEntry<K, V> entry : writeEntries)
+                         tx.addEntry(req.messageId(), entry);
+                 }
+ 
+                 if (tx.pessimistic())
+                     tx.prepare();
+ 
+                 IgniteFuture<IgniteTx> commitFut = tx.commitAsync();
+ 
+                 // Only for error logging.
+                 commitFut.listenAsync(CU.errorLogger(log));
+ 
+                 return commitFut;
+             }
+             else {
+                 assert tx != null : "Transaction is null for near rollback request [nodeId=" +
+                     nodeId + ", req=" + req + "]";
+ 
+                 tx.syncRollback(req.syncRollback());
+ 
+                 tx.nearFinishFutureId(req.futureId());
+                 tx.nearFinishMiniId(req.miniId());
+ 
+                 IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
+ 
+                 // Only for error logging.
+                 rollbackFut.listenAsync(CU.errorLogger(log));
+ 
+                 return rollbackFut;
+             }
+         }
+         catch (Throwable e) {
+             U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
+ 
+             if (tx != null) {
+                 IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
+ 
+                 // Only for error logging.
+                 rollbackFut.listenAsync(CU.errorLogger(log));
+ 
+                 return rollbackFut;
+             }
+ 
+             return new GridFinishedFuture<>(ctx.kernalContext(), e);
+         }
+     }
+ 
+     /**
+      * @param commit Commit flag (rollback if {@code false}).
+      * @param tx Transaction to commit.
+      * @return Future.
+      */
+     public IgniteFuture<IgniteTx> finishColocatedLocal(boolean commit, GridNearTxLocal<K, V> tx) {
+         try {
+             if (commit) {
+                 if (!tx.markFinalizing(USER_FINISH)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+ 
+                     return null;
+                 }
+ 
+                 return tx.commitAsyncLocal();
+             }
+             else
+                 return tx.rollbackAsyncLocal();
+         }
+         catch (Throwable e) {
+             U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e);
+ 
+             if (tx != null)
+                 return tx.rollbackAsync();
+ 
+             return new GridFinishedFuture<>(ctx.kernalContext(), e);
+         }
+     }
+ 
+     /**
+      * @param nodeId Sender node ID.
+      * @param req Request.
+      */
+     protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest<K, V> req) {
+         assert nodeId != null;
+         assert req != null;
+ 
+         if (log.isDebugEnabled())
+             log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() +
+                 ", nodeId=" + nodeId + ", req=" + req + ']');
+ 
+         GridDhtTxRemote<K, V> dhtTx = null;
+         GridNearTxRemote<K, V> nearTx = null;
+ 
+         GridDhtTxPrepareResponse<K, V> res;
+ 
+         try {
+             res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId());
+ 
+             // Start near transaction first.
+             nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
+             dhtTx = startRemoteTx(nodeId, req, res);
+ 
+             // Set evicted keys from near transaction.
+             if (nearTx != null)
+                 res.nearEvicted(nearTx.evicted());
+ 
+             if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
+                 res.invalidPartitions(dhtTx.invalidPartitions());
+         }
+         catch (IgniteCheckedException e) {
+             if (e instanceof IgniteTxRollbackException)
+                 U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e);
+             else if (e instanceof IgniteTxOptimisticException) {
+                 if (log.isDebugEnabled())
+                     log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx);
+             }
+             else
+                 U.error(log, "Failed to process prepare request: " + req, e);
+ 
+             if (nearTx != null)
+                 nearTx.rollback();
+ 
+             if (dhtTx != null)
+                 dhtTx.rollback();
+ 
+             res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId(), e);
+         }
+ 
+         try {
+             // Reply back to sender.
+             ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+         }
+         catch (IgniteCheckedException e) {
+             if (e instanceof ClusterTopologyException) {
+                 if (log.isDebugEnabled())
+                     log.debug("Failed to send tx response to remote node (node left grid) [node=" + nodeId +
+                         ", xid=" + req.version());
+             }
+             else
+                 U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [node=" + nodeId +
+                     ", xid=" + req.version() + ", err=" +  e.getMessage() + ']');
+ 
+             if (nearTx != null)
+                 nearTx.rollback();
+ 
+             if (dhtTx != null)
+                 dhtTx.rollback();
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param req Request.
+      */
+     @SuppressWarnings({"unchecked"})
+     protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest<K, V> req) {
+         assert nodeId != null;
+         assert req != null;
+ 
+         if (log.isDebugEnabled())
+             log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
+ 
+         GridDhtTxRemote<K, V> dhtTx = ctx.tm().tx(req.version());
+         GridNearTxRemote<K, V> nearTx = ctx.tm().nearTx(req.version());
+ 
+         try {
+             if (dhtTx == null && !F.isEmpty(req.writes()))
+                 dhtTx = startRemoteTxForFinish(nodeId, req);
+ 
+             if (dhtTx != null) {
+                 dhtTx.syncCommit(req.syncCommit());
+                 dhtTx.syncRollback(req.syncRollback());
+             }
+ 
+             // One-phase commit transactions send finish requests to backup nodes.
+             if (dhtTx != null && req.onePhaseCommit()) {
+                 dhtTx.onePhaseCommit(true);
+ 
+                 dhtTx.writeVersion(req.writeVersion());
+             }
+ 
+             if (nearTx == null && !F.isEmpty(req.nearWrites()) && req.groupLock())
+                 nearTx = startNearRemoteTxForFinish(nodeId, req);
+ 
+             if (nearTx != null) {
+                 nearTx.syncCommit(req.syncCommit());
+                 nearTx.syncRollback(req.syncRollback());
+             }
+         }
+         catch (IgniteTxRollbackException e) {
+             if (log.isDebugEnabled())
+                 log.debug("Received finish request for completed transaction (will ignore) [req=" + req + ", err=" +
+                     e.getMessage() + ']');
+ 
+             sendReply(nodeId, req);
+ 
+             return;
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to start remote DHT and Near transactions (will invalidate transactions) [dhtTx=" +
+                 dhtTx + ", nearTx=" + nearTx + ']', e);
+ 
+             if (dhtTx != null)
+                 dhtTx.invalidate(true);
+ 
+             if (nearTx != null)
+                 nearTx.invalidate(true);
+         }
+         catch (GridDistributedLockCancelledException ignore) {
+             U.warn(log, "Received commit request to cancelled lock (will invalidate transaction) [dhtTx=" +
+                 dhtTx + ", nearTx=" + nearTx + ']');
+ 
+             if (dhtTx != null)
+                 dhtTx.invalidate(true);
+ 
+             if (nearTx != null)
+                 nearTx.invalidate(true);
+         }
+ 
+         // Safety - local transaction will finish explicitly.
+         if (nearTx != null && nearTx.local())
+             nearTx = null;
+ 
 -        finish(nodeId, dhtTx, req, req.writes());
++        finish(nodeId, dhtTx, req, req.writes(), req.ttls());
+ 
+         if (nearTx != null)
 -            finish(nodeId, nearTx, req, req.nearWrites());
++            finish(nodeId, nearTx, req, req.nearWrites(), req.nearTtls());
+ 
+         sendReply(nodeId, req);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param tx Transaction.
+      * @param req Request.
+      * @param writes Writes.
++     * @param ttls TTLs for optimistic transaction.
+      */
+     protected void finish(
+         UUID nodeId,
+         IgniteTxRemoteEx<K, V> tx,
+         GridDhtTxFinishRequest<K, V> req,
 -        Collection<IgniteTxEntry<K, V>> writes) {
++        Collection<IgniteTxEntry<K, V>> writes,
++        @Nullable GridLongList ttls) {
+         // We don't allow explicit locks for transactions and
+         // therefore immediately return if transaction is null.
+         // However, we may decide to relax this restriction in
+         // future.
+         if (tx == null) {
+             if (req.commit())
+                 // Must be some long time duplicate, but we add it anyway.
+                 ctx.tm().addCommittedTx(req.version(), null);
+             else
+                 ctx.tm().addRolledbackTx(req.version());
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Received finish request for non-existing transaction (added to completed set) " +
+                     "[senderNodeId=" + nodeId + ", res=" + req + ']');
+ 
+             return;
+         }
+         else if (log.isDebugEnabled())
+             log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req +
+                 ", tx=" + tx + ']');
+ 
++        assert ttls == null || tx.concurrency() == OPTIMISTIC;
++
+         try {
+             if (req.commit() || req.isSystemInvalidate()) {
+                 if (tx.commitVersion(req.commitVersion())) {
+                     tx.invalidate(req.isInvalidate());
+                     tx.systemInvalidate(req.isSystemInvalidate());
+ 
+                     if (!F.isEmpty(writes)) {
+                         // In OPTIMISTIC mode, we get the values at PREPARE stage.
+                         assert tx.concurrency() == PESSIMISTIC;
+ 
+                         for (IgniteTxEntry<K, V> entry : writes) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Unmarshalled transaction entry from pessimistic transaction [key=" +
+                                     entry.key() + ", value=" + entry.value() + ", tx=" + tx + ']');
+ 
+                             if (!tx.setWriteValue(entry))
+                                 U.warn(log, "Received entry to commit that was not present in transaction [entry=" +
+                                     entry + ", tx=" + tx + ']');
+                         }
+                     }
++                    else if (tx.concurrency() == OPTIMISTIC && ttls != null) {
++                        int idx = 0;
++
++                        for (GridCacheTxEntry<K, V> e : tx.writeEntries())
++                            e.ttl(ttls.get(idx));
++                    }
+ 
+                     // Complete remote candidates.
+                     tx.doneRemote(req.baseVersion(), null, null, null);
+ 
+                     if (tx.pessimistic())
+                         tx.prepare();
+ 
+                     tx.commit();
+                 }
+             }
+             else {
+                 assert tx != null;
+ 
+                 tx.doneRemote(req.baseVersion(), null, null, null);
+ 
+                 tx.rollback();
+             }
+         }
+         catch (Throwable e) {
+             U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
+ 
+             // Mark transaction for invalidate.
+             tx.invalidate(true);
+             tx.systemInvalidate(true);
+ 
+             try {
+                 tx.commit();
+             }
+             catch (IgniteCheckedException ex) {
+                 U.error(log, "Failed to invalidate transaction: " + tx, ex);
+             }
+         }
+     }
+ 
+     /**
+      * Sends tx finish response to remote node, if response is requested.
+      *
+      * @param nodeId Node id that originated finish request.
+      * @param req Request.
+      */
+     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req) {
+         if (req.replyRequired()) {
+             GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
+ 
+             try {
+                 ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+             }
+             catch (Throwable e) {
+                 // Double-check.
+                 if (ctx.discovery().node(nodeId) == null) {
+                     if (log.isDebugEnabled())
+                         log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
+                 }
+                 else
+                     U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+             }
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param req Request.
+      * @param res Response.
+      * @return Remote transaction.
+      * @throws IgniteCheckedException If failed.
+      */
+     @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId,
+         GridDhtTxPrepareRequest<K, V> req,
+         GridDhtTxPrepareResponse<K, V> res) throws IgniteCheckedException {
+         if (!F.isEmpty(req.writes())) {
+             GridDhtTxRemote<K, V> tx = ctx.tm().tx(req.version());
+ 
+             assert F.isEmpty(req.candidatesByKey());
+ 
+             if (tx == null) {
+                 tx = new GridDhtTxRemote<>(
+                     ctx,
+                     req.nearNodeId(),
+                     req.futureId(),
+                     nodeId,
+                     req.threadId(),
+                     req.topologyVersion(),
+                     req.version(),
+                     req.commitVersion(),
+                     req.system(),
+                     req.concurrency(),
+                     req.isolation(),
+                     req.isInvalidate(),
+                     req.timeout(),
+                     req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
+                     req.groupLockKey(),
+                     req.nearXidVersion(),
+                     req.transactionNodes(),
+                     req.subjectId(),
+                     req.taskNameHash());
+ 
+                 tx = ctx.tm().onCreated(tx);
+ 
+                 if (tx == null || !ctx.tm().onStarted(tx)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Attempt to start a completed transaction (will ignore): " + tx);
+ 
+                     return null;
+                 }
+             }
+ 
+             if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {
+                 int idx = 0;
+ 
+                 for (IgniteTxEntry<K, V> entry : req.writes()) {
+                     GridCacheContext<K, V> cacheCtx = entry.context();
+ 
+                     tx.addWrite(entry, ctx.deploy().globalLoader());
+ 
+                     if (isNearEnabled(cacheCtx) && req.invalidateNearEntry(idx))
+                         invalidateNearEntry(cacheCtx, entry.key(), req.version());
+ 
+                     try {
+                         if (req.needPreloadKey(idx)) {
+                             GridCacheEntryEx<K, V> cached = entry.cached();
+ 
+                             if (cached == null)
+                                 cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+ 
+                             GridCacheEntryInfo<K, V> info = cached.info();
+ 
+                             if (info != null && !info.isNew() && !info.isDeleted())
+                                 res.addPreloadEntry(info);
+                         }
+                     }
+                     catch (GridDhtInvalidPartitionException e) {
+                         tx.addInvalidPartition(cacheCtx, e.partition());
+ 
+                         tx.clearEntry(entry.txKey());
+                     }
+ 
+                     idx++;
+                 }
+             }
+ 
+             // Prepare prior to reordering, so the pending locks added
+             // in prepare phase will get properly ordered as well.
+             tx.prepare();
+ 
+             if (req.last())
+                 tx.state(PREPARED);
+ 
+             res.invalidPartitions(tx.invalidPartitions());
+ 
+             if (tx.empty() && req.last()) {
+                 tx.rollback();
+ 
+                 return null;
+             }
+ 
+             return tx;
+         }
+ 
+         return null;
+     }
+ 
+     /**
+      * @param key Key
+      * @param ver Version.
+      * @throws IgniteCheckedException If invalidate failed.
+      */
+     private void invalidateNearEntry(GridCacheContext<K, V> cacheCtx, K key, GridCacheVersion ver)
+         throws IgniteCheckedException {
+         GridNearCacheAdapter<K, V> near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
+ 
+         GridCacheEntryEx<K, V> nearEntry = near.peekEx(key);
+ 
+         if (nearEntry != null)
+             nearEntry.invalidate(null, ver);
+     }
+ 
+     /**
+      * Called while processing dht tx prepare request.
+      *
+      * @param ldr Loader.
+      * @param nodeId Sender node ID.
+      * @param req Request.
+      * @return Remote transaction.
+      * @throws IgniteCheckedException If failed.
+      */
+     @Nullable public GridNearTxRemote<K, V> startNearRemoteTx(ClassLoader ldr, UUID nodeId,
+         GridDhtTxPrepareRequest<K, V> req) throws IgniteCheckedException {
+         assert F.isEmpty(req.candidatesByKey());
+ 
+         if (!F.isEmpty(req.nearWrites())) {
+             GridNearTxRemote<K, V> tx = ctx.tm().nearTx(req.version());
+ 
+             if (tx == null) {
+                 tx = new GridNearTxRemote<>(
+                     ctx,
+                     ldr,
+                     nodeId,
+                     req.nearNodeId(),
+                     req.threadId(),
+                     req.version(),
+                     req.commitVersion(),
+                     req.system(),
+                     req.concurrency(),
+                     req.isolation(),
+                     req.isInvalidate(),
+                     req.timeout(),
+                     req.nearWrites(),
+                     req.txSize(),
+                     req.groupLockKey(),
+                     req.subjectId(),
+                     req.taskNameHash()
+                 );
+ 
+                 if (!tx.empty()) {
+                     tx = ctx.tm().onCreated(tx);
+ 
+                     if (tx == null || !ctx.tm().onStarted(tx))
+                         throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + tx);
+                 }
+             }
+             else
+                 tx.addEntries(ldr, req.nearWrites());
+ 
+             tx.ownedVersions(req.owned());
+ 
+             // Prepare prior to reordering, so the pending locks added
+             // in prepare phase will get properly ordered as well.
+             tx.prepare();
+ 
+             return tx;
+         }
+ 
+         return null;
+     }
+ 
+     /**
+      * @param nodeId Primary node ID.
+      * @param req Request.
+      * @return Remote transaction.
+      * @throws IgniteCheckedException If failed.
+      * @throws GridDistributedLockCancelledException If lock has been cancelled.
+      */
+     @SuppressWarnings({"RedundantTypeArguments"})
+     @Nullable GridDhtTxRemote<K, V> startRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
+         throws IgniteCheckedException, GridDistributedLockCancelledException {
+ 
+         GridDhtTxRemote<K, V> tx = null;
+ 
+         boolean marked = false;
+ 
+         for (IgniteTxEntry<K, V> txEntry : req.writes()) {
+             GridDistributedCacheEntry<K, V> entry = null;
+ 
+             GridCacheContext<K, V> cacheCtx = txEntry.context();
+ 
+             while (true) {
+                 try {
+                     int part = cacheCtx.affinity().partition(txEntry.key());
+ 
+                     GridDhtLocalPartition<K, V> locPart = cacheCtx.topology().localPartition(part,
+                         req.topologyVersion(), false);
+ 
+                     // Handle implicit locks for pessimistic transactions.
+                     if (tx == null)
+                         tx = ctx.tm().tx(req.version());
+ 
+                     if (locPart == null || !locPart.reserve()) {
+                         if (log.isDebugEnabled())
+                             log.debug("Local partition for given key is already evicted (will remove from tx) " +
+                                 "[key=" + txEntry.key() + ", part=" + part + ", locPart=" + locPart + ']');
+ 
+                         if (tx != null)
+                             tx.clearEntry(txEntry.txKey());
+ 
+                         break;
+                     }
+ 
+                     try {
+                         entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key(),
+                             req.topologyVersion());
+ 
+                         if (tx == null) {
+                             tx = new GridDhtTxRemote<>(
+                                 ctx,
+                                 req.nearNodeId(),
+                                 req.futureId(),
+                                 nodeId,
+                                 // We can pass null as nearXidVersion as transaction will be committed right away.
+                                 null,
+                                 req.threadId(),
+                                 req.topologyVersion(),
+                                 req.version(),
+                                 /*commitVer*/null,
+                                 req.system(),
+                                 PESSIMISTIC,
+                                 req.isolation(),
+                                 req.isInvalidate(),
+                                 0,
+                                 req.txSize(),
+                                 req.groupLockKey(),
+                                 req.subjectId(),
+                                 req.taskNameHash());
+ 
+                             tx = ctx.tm().onCreated(tx);
+ 
+                             if (tx == null || !ctx.tm().onStarted(tx))
+                                 throw new IgniteTxRollbackException("Failed to acquire lock " +
+                                     "(transaction has been completed): " + req.version());
+                         }
+ 
+                         tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(),
+                             txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion());
+ 
+                         if (!marked) {
+                             if (tx.markFinalizing(USER_FINISH))
+                                 marked = true;
+                             else {
+                                 tx.clearEntry(txEntry.txKey());
+ 
+                                 return null;
+                             }
+                         }
+ 
+                         // Add remote candidate before reordering.
+                         if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
+                             entry.addRemote(
+                                 req.nearNodeId(),
+                                 nodeId,
+                                 req.threadId(),
+                                 req.version(),
+                                 0,
+                                 /*tx*/true,
+                                 tx.implicitSingle(),
+                                 null
+                             );
+ 
+                         // Double-check in case if sender node left the grid.
+                         if (ctx.discovery().node(req.nearNodeId()) == null) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
+ 
+                             tx.rollback();
+ 
+                             return null;
+                         }
+ 
+                         // Entry is legit.
+                         break;
+                     }
+                     finally {
+                         locPart.release();
+                     }
+                 }
+                 catch (GridCacheEntryRemovedException ignored) {
+                     assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
+                         entry;
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
+ 
+                     tx.clearEntry(txEntry.txKey());
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
+                             entry + ", tx=" + tx + ']');
+                 }
+                 catch (GridDhtInvalidPartitionException p) {
+                     if (log.isDebugEnabled())
+                         log.debug("Received invalid partition (will clear entry from tx) [part=" + p + ", req=" +
+                             req + ", txEntry=" + txEntry + ']');
+ 
+                     if (tx != null)
+                         tx.clearEntry(txEntry.txKey());
+ 
+                     break;
+                 }
+             }
+         }
+ 
+         if (tx != null && tx.empty()) {
+             tx.rollback();
+ 
+             return null;
+         }
+ 
+         return tx;
+     }
+ 
+     /**
+      * @param nodeId Primary node ID.
+      * @param req Request.
+      * @return Remote transaction.
+      * @throws IgniteCheckedException If failed.
+      * @throws GridDistributedLockCancelledException If lock has been cancelled.
+      */
+     @SuppressWarnings({"RedundantTypeArguments"})
+     @Nullable public GridNearTxRemote<K, V> startNearRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
+         throws IgniteCheckedException, GridDistributedLockCancelledException {
+         assert req.groupLock();
+ 
+         GridNearTxRemote<K, V> tx = null;
+ 
+         ClassLoader ldr = ctx.deploy().globalLoader();
+ 
+         if (ldr != null) {
+             boolean marked = false;
+ 
+             for (IgniteTxEntry<K, V> txEntry : req.nearWrites()) {
+                 GridDistributedCacheEntry<K, V> entry = null;
+ 
+                 GridCacheContext<K, V> cacheCtx = txEntry.context();
+ 
+                 while (true) {
+                     try {
+                         entry = cacheCtx.near().peekExx(txEntry.key());
+ 
+                         if (entry != null) {
+                             entry.keyBytes(txEntry.keyBytes());
+ 
+                             // Handle implicit locks for pessimistic transactions.
+                             if (tx == null)
+                                 tx = ctx.tm().nearTx(req.version());
+ 
+                             if (tx == null) {
+                                 tx = new GridNearTxRemote<>(
+                                     ctx,
+                                     nodeId,
+                                     req.nearNodeId(),
+                                     // We can pass null as nearXidVer as transaction will be committed right away.
+                                     null,
+                                     req.threadId(),
+                                     req.version(),
+                                     null,
+                                     req.system(),
+                                     PESSIMISTIC,
+                                     req.isolation(),
+                                     req.isInvalidate(),
+                                     0,
+                                     req.txSize(),
+                                     req.groupLockKey(),
+                                     req.subjectId(),
+                                     req.taskNameHash());
+ 
+                                 tx = ctx.tm().onCreated(tx);
+ 
+                                 if (tx == null || !ctx.tm().onStarted(tx))
+                                     throw new IgniteTxRollbackException("Failed to acquire lock " +
+                                         "(transaction has been completed): " + req.version());
+ 
+                                 if (!marked)
+                                     marked = tx.markFinalizing(USER_FINISH);
+ 
+                                 if (!marked)
+                                     return null;
+                             }
+ 
+                             if (tx.local())
+                                 return null;
+ 
+                             if (!marked)
+                                 marked = tx.markFinalizing(USER_FINISH);
+ 
+                             if (marked)
+                                 tx.addEntry(cacheCtx, txEntry.txKey(), txEntry.keyBytes(), txEntry.op(), txEntry.value(),
+                                     txEntry.valueBytes(), txEntry.drVersion());
+                             else
+                                 return null;
+ 
+                             if (req.groupLock()) {
+                                 tx.markGroupLock();
+ 
+                                 if (!txEntry.groupLockEntry())
+                                     tx.groupLockKey(txEntry.txKey());
+                             }
+ 
+                             // Add remote candidate before reordering.
+                             if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
+                                 entry.addRemote(
+                                     req.nearNodeId(),
+                                     nodeId,
+                                     req.threadId(),
+                                     req.version(),
+                                     0,
+                                     /*tx*/true,
+                                     tx.implicitSingle(),
+                                     null
+                                 );
+                         }
+ 
+                         // Double-check in case if sender node left the grid.
+                         if (ctx.discovery().node(req.nearNodeId()) == null) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
+ 
+                             if (tx != null)
+                                 tx.rollback();
+ 
+                             return null;
+                         }
+ 
+                         // Entry is legit.
+                         break;
+                     }
+                     catch (GridCacheEntryRemovedException ignored) {
+                         assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
+                             entry;
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
+ 
+                         if (tx != null) {
+                             tx.clearEntry(txEntry.txKey());
+ 
+                             if (log.isDebugEnabled())
+                                 log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
+                                     entry + ", tx=" + tx + ']');
+                         }
+ 
+                         // Will retry in while loop.
+                     }
+                 }
+             }
+         }
+         else {
+             String err = "Failed to acquire deployment class loader for message: " + req;
+ 
+             U.warn(log, err);
+ 
+             throw new IgniteCheckedException(err);
+         }
+ 
+         return tx;
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param req Request.
+      */
+     protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
+         if (log.isDebugEnabled())
+             log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
+ 
+         boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+ 
+         GridCacheOptimisticCheckPreparedTxResponse<K, V> res =
+             new GridCacheOptimisticCheckPreparedTxResponse<>(req.version(), req.futureId(), req.miniId(), prepared);
+ 
+         try {
+             if (log.isDebugEnabled())
+                 log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+ 
+             ctx.io().send(nodeId, res);
+         }
+         catch (ClusterTopologyException ignored) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" +
+                     nodeId + ", res=" + res + ']');
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
+         if (log.isDebugEnabled())
+             log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+ 
+         GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = (GridCacheOptimisticCheckPreparedTxFuture<K, V>)ctx.mvcc().
+             <Boolean>future(res.version(), res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Received response for unknown future (will ignore): " + res);
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param req Request.
+      */
+     protected void processCheckCommittedTxRequest(final UUID nodeId,
+         final GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
+         if (log.isDebugEnabled())
+             log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']');
+ 
+         IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req);
+ 
+         infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() {
+             @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) {
+                 GridCacheCommittedTxInfo<K, V> info = null;
+ 
+                 try {
+                     info = infoFut.get();
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to obtain committed info for transaction (will rollback): " + req, e);
+                 }
+ 
+                 GridCachePessimisticCheckCommittedTxResponse<K, V>
+                     res = new GridCachePessimisticCheckCommittedTxResponse<>(
+                     req.version(), req.futureId(), req.miniId(), info);
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Finished waiting for tx committed info [req=" + req + ", res=" + res + ']');
+ 
+                 sendCheckCommittedResponse(nodeId, res);            }
+         });
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param res Response.
+      */
+     protected void processCheckCommittedTxResponse(UUID nodeId,
+         GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+         if (log.isDebugEnabled())
+             log.debug("Processing check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+ 
+         GridCachePessimisticCheckCommittedTxFuture<K, V> fut =
+             (GridCachePessimisticCheckCommittedTxFuture<K, V>)ctx.mvcc().<GridCacheCommittedTxInfo<K, V>>future(
+                 res.version(), res.futureId());
+ 
+         if (fut == null) {
+             if (log.isDebugEnabled())
+                 log.debug("Received response for unknown future (will ignore): " + res);
+ 
+             return;
+         }
+ 
+         fut.onResult(nodeId, res);
+     }
+ 
+     /**
+      * Sends check committed response to remote node.
+      *
+      * @param nodeId Node ID to send to.
+      * @param res Reponse to send.
+      */
+     private void sendCheckCommittedResponse(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+         try {
+             if (log.isDebugEnabled())
+                 log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+ 
+             ctx.io().send(nodeId, res);
+         }
+         catch (ClusterTopologyException ignored) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" +
+                     nodeId + ", res=" + res + ']');
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+         }
+     }
+ }