You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/22 11:42:47 UTC
[45/50] [abbrv] incubator-ignite git commit: # ignite-41
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f537940c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
index 0000000,e0effe0..2254ddb
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxHandler.java
@@@ -1,0 -1,1493 +1,1504 @@@
+ /* @java.file.header */
+
+ /* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+ package org.gridgain.grid.kernal.processors.cache.transactions;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.transactions.*;
+ import org.gridgain.grid.kernal.processors.cache.*;
+ import org.gridgain.grid.kernal.processors.cache.distributed.*;
+ import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
+ import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
++import org.gridgain.grid.util.*;
+ import org.gridgain.grid.util.future.*;
+ import org.gridgain.grid.util.lang.*;
+ import org.gridgain.grid.util.typedef.*;
+ import org.gridgain.grid.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.util.*;
+
+ import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+ import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+ import static org.apache.ignite.transactions.IgniteTxState.*;
+ import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
+ import static org.gridgain.grid.kernal.processors.cache.transactions.IgniteTxEx.FinalizationStatus.*;
+ import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
+
+ /**
+ * Isolated logic to process cache messages.
+ */
+ public class IgniteTxHandler<K, V> {
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Shared cache context. */
+ private GridCacheSharedContext<K, V> ctx;
+
+ public IgniteFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId,
+ final GridNearTxPrepareRequest<K, V> req) {
+ return prepareTx(nearNodeId, null, req);
+ }
+
+ /**
+ * @param ctx Shared cache context.
+ */
+ public IgniteTxHandler(GridCacheSharedContext<K, V> ctx) {
+ this.ctx = ctx;
+
+ log = ctx.logger(IgniteTxHandler.class);
+
+ ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
+ processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse<K, V>)msg);
+ }
+ });
+
+ ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
+ new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
+ processCheckPreparedTxRequest(nodeId, req);
+ }
+ });
+
+ ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
+ new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
+ processCheckPreparedTxResponse(nodeId, res);
+ }
+ });
+
+ ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxRequest.class,
+ new CI2<UUID, GridCachePessimisticCheckCommittedTxRequest<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
+ processCheckCommittedTxRequest(nodeId, req);
+ }
+ });
+
+ ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxResponse.class,
+ new CI2<UUID, GridCachePessimisticCheckCommittedTxResponse<K, V>>() {
+ @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+ processCheckCommittedTxResponse(nodeId, res);
+ }
+ });
+ }
+
+ /**
+ * @param nearNodeId Near node ID that initiated transaction.
+ * @param locTx Optional local transaction.
+ * @param req Near prepare request.
+ * @return Future for transaction.
+ */
+ public IgniteFuture<IgniteTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx,
+ final GridNearTxPrepareRequest<K, V> req) {
+ assert nearNodeId != null;
+ assert req != null;
+
+ if (locTx != null) {
+ if (req.near()) {
+ // Make sure not to provide Near entries to DHT cache.
+ req.cloneEntries();
+
+ return prepareNearTx(nearNodeId, req);
+ }
+ else
+ return prepareColocatedTx(locTx, req);
+ }
+ else
+ return prepareNearTx(nearNodeId, req);
+ }
+
+ /**
+ * Prepares local colocated tx.
+ *
+ * @param locTx Local transaction.
+ * @param req Near prepare request.
+ * @return Prepare future.
+ */
+ private IgniteFuture<IgniteTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx,
+ final GridNearTxPrepareRequest<K, V> req) {
+
+ IgniteFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys.
+
+ return new GridEmbeddedFuture<>(
+ ctx.kernalContext(),
+ fut,
+ new C2<Object, Exception, IgniteFuture<IgniteTxEx<K, V>>>() {
+ @Override public IgniteFuture<IgniteTxEx<K, V>> apply(Object o, Exception ex) {
+ if (ex != null)
+ throw new GridClosureException(ex);
+
+ IgniteFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(),
+ req.transactionNodes(), req.last(), req.lastBackups());
+
+ if (locTx.isRollbackOnly())
+ locTx.rollbackAsync();
+
+ return fut;
+ }
+ },
+ new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() {
+ @Nullable @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> tx, Exception e) {
+ if (e != null) {
+ // tx can be null of exception occurred.
+ if (tx != null)
+ tx.setRollbackOnly(); // Just in case.
+
+ if (!(e instanceof IgniteTxOptimisticException))
+ U.error(log, "Failed to prepare DHT transaction: " + tx, e);
+ }
+
+ return tx;
+ }
+ }
+ );
+ }
+
+ /**
+ * Prepares near transaction.
+ *
+ * @param nearNodeId Near node ID that initiated transaction.
+ * @param req Near prepare request.
+ * @return Prepare future.
+ */
+ private IgniteFuture<IgniteTxEx<K, V>> prepareNearTx(final UUID nearNodeId,
+ final GridNearTxPrepareRequest<K, V> req) {
+ ClusterNode nearNode = ctx.node(nearNodeId);
+
+ if (nearNode == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received transaction request from node that left grid (will ignore): " + nearNodeId);
+
+ return null;
+ }
+
+ try {
+ for (IgniteTxEntry<K, V> e : F.concat(false, req.reads(), req.writes()))
+ e.unmarshal(ctx, false, ctx.deploy().globalLoader());
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(ctx.kernalContext(), e);
+ }
+
+ GridDhtTxLocal<K, V> tx;
+
+ GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
+
+ if (mappedVer != null) {
+ tx = ctx.tm().tx(mappedVer);
+
+ if (tx == null)
+ U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version()
+ + ", mappedVer=" + mappedVer + ']');
+ }
+ else {
+ tx = new GridDhtTxLocal<>(
+ ctx,
+ nearNode.id(),
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ /*implicit*/false,
+ /*implicit-single*/false,
+ req.system(),
+ req.concurrency(),
+ req.isolation(),
+ req.timeout(),
+ req.isInvalidate(),
+ false,
+ req.txSize(),
+ req.groupLockKey(),
+ req.partitionLock(),
+ req.transactionNodes(),
+ req.subjectId(),
+ req.taskNameHash()
+ );
+
+ tx = ctx.tm().onCreated(tx);
+
+ if (tx != null)
+ tx.topologyVersion(req.topologyVersion());
+ else
+ U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
+ tx.xid() + ", req=" + req + ']');
+ }
+
+ if (tx != null) {
+ IgniteFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(),
+ req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(),
+ req.lastBackups());
+
+ if (tx.isRollbackOnly()) {
+ try {
+ tx.rollback();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to rollback transaction: " + tx, e);
+ }
+ }
+
+ final GridDhtTxLocal<K, V> tx0 = tx;
+
+ fut.listenAsync(new CI1<IgniteFuture<IgniteTxEx<K, V>>>() {
+ @Override public void apply(IgniteFuture<IgniteTxEx<K, V>> txFut) {
+ try {
+ txFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ tx0.setRollbackOnly(); // Just in case.
+
+ if (!(e instanceof IgniteTxOptimisticException))
+ U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
+ }
+ }
+ });
+
+ return fut;
+ }
+ else
+ return new GridFinishedFuture<>(ctx.kernalContext(), (IgniteTxEx<K, V>)null);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V> res) {
+ GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)ctx.mvcc()
+ .<IgniteTxEx<K, V>>future(res.version(), res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']');
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse<K, V> res) {
+ ctx.tm().onFinishedRemote(nodeId, res.threadId());
+
+ GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(
+ res.xid(), res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find future for finish response [sender=" + nodeId + ", res=" + res + ']');
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) {
+ GridDhtTxPrepareFuture<K, V> fut = (GridDhtTxPrepareFuture<K, V>)ctx.mvcc().
+ <IgniteTxEx<K, V>>future(res.version(), res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void processDhtTxFinishResponse(UUID nodeId, GridDhtTxFinishResponse<K, V> res) {
+ assert nodeId != null;
+ assert res != null;
+
+ GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(res.xid(),
+ res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ * @return Future.
+ */
+ @Nullable public IgniteFuture<IgniteTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest<K, V> req) {
+ return finish(nodeId, null, req);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ * @return Future.
+ */
+ @Nullable public IgniteFuture<IgniteTx> finish(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
+ GridNearTxFinishRequest<K, V> req) {
+ assert nodeId != null;
+ assert req != null;
+
+ // Transaction on local cache only.
+ if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
+ return new GridFinishedFutureEx<IgniteTx>(locTx);
+
+ if (log.isDebugEnabled())
+ log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]");
+
+ IgniteFuture<IgniteTx> colocatedFinishFut = null;
+
+ if (locTx != null && locTx.colocatedLocallyMapped())
+ colocatedFinishFut = finishColocatedLocal(req.commit(), locTx);
+
+ IgniteFuture<IgniteTx> nearFinishFut = null;
+
+ if (locTx == null || locTx.nearLocallyMapped()) {
+ if (locTx != null)
+ req.cloneEntries();
+
+ nearFinishFut = finishDhtLocal(nodeId, locTx, req);
+ }
+
+ if (colocatedFinishFut != null && nearFinishFut != null) {
+ GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext());
+
+ res.add(colocatedFinishFut);
+ res.add(nearFinishFut);
+
+ res.markInitialized();
+
+ return res;
+ }
+
+ if (colocatedFinishFut != null)
+ return colocatedFinishFut;
+
+ return nearFinishFut;
+ }
+
+ /**
+ * @param nodeId Node ID initiated commit.
+ * @param locTx Optional local transaction.
+ * @param req Finish request.
+ * @return Finish future.
+ */
+ private IgniteFuture<IgniteTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
+ GridNearTxFinishRequest<K, V> req) {
+ GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
+
+ GridDhtTxLocal<K, V> tx = null;
+
+ if (dhtVer == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received transaction finish request for unknown near version (was lock explicit?): " + req);
+ }
+ else
+ tx = ctx.tm().tx(dhtVer);
+
+ if (tx == null && !req.explicitLock()) {
+ assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx;
+
+ U.warn(log, "Received finish request for completed transaction (the message may be too late " +
+ "and transaction could have been DGCed by now) [commit=" + req.commit() +
+ ", xid=" + req.version() + ']');
+
+ // Always send finish response.
+ GridCacheMessage<K, V> res = new GridNearTxFinishResponse<>(req.version(), req.threadId(), req.futureId(),
+ req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
+
+ try {
+ ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ }
+ catch (Throwable e) {
+ // Double-check.
+ if (ctx.discovery().node(nodeId) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res +
+ ']');
+ }
+ else
+ U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
+ "res=" + res + ']', e);
+ }
+
+ return null;
+ }
+
+ try {
+ if (req.commit()) {
+ if (tx == null) {
+ // Create transaction and add entries.
+ tx = ctx.tm().onCreated(
+ new GridDhtTxLocal<>(
+ ctx,
+ nodeId,
+ req.version(),
+ req.futureId(),
+ req.miniId(),
+ req.threadId(),
+ true,
+ false, /* we don't know, so assume false. */
+ req.system(),
+ PESSIMISTIC,
+ READ_COMMITTED,
+ /*timeout */0,
+ req.isInvalidate(),
+ req.storeEnabled(),
+ req.txSize(),
+ req.groupLockKey(),
+ false,
+ null,
+ req.subjectId(),
+ req.taskNameHash()));
+
+ if (tx == null || !ctx.tm().onStarted(tx))
+ throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + req);
+
+ tx.topologyVersion(req.topologyVersion());
+ }
+
+ tx.storeEnabled(req.storeEnabled());
+
+ if (!tx.markFinalizing(USER_FINISH)) {
+ if (log.isDebugEnabled())
+ log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+
+ return null;
+ }
+
+ if (!tx.syncCommit())
+ tx.syncCommit(req.syncCommit());
+
+ tx.nearFinishFutureId(req.futureId());
+ tx.nearFinishMiniId(req.miniId());
+ tx.recoveryWrites(req.recoveryWrites());
+
+ Collection<IgniteTxEntry<K, V>> writeEntries = req.writes();
+
+ if (!F.isEmpty(writeEntries)) {
+ // In OPTIMISTIC mode, we get the values at PREPARE stage.
+ assert tx.concurrency() == PESSIMISTIC;
+
+ for (IgniteTxEntry<K, V> entry : writeEntries)
+ tx.addEntry(req.messageId(), entry);
+ }
+
+ if (tx.pessimistic())
+ tx.prepare();
+
+ IgniteFuture<IgniteTx> commitFut = tx.commitAsync();
+
+ // Only for error logging.
+ commitFut.listenAsync(CU.errorLogger(log));
+
+ return commitFut;
+ }
+ else {
+ assert tx != null : "Transaction is null for near rollback request [nodeId=" +
+ nodeId + ", req=" + req + "]";
+
+ tx.syncRollback(req.syncRollback());
+
+ tx.nearFinishFutureId(req.futureId());
+ tx.nearFinishMiniId(req.miniId());
+
+ IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
+
+ // Only for error logging.
+ rollbackFut.listenAsync(CU.errorLogger(log));
+
+ return rollbackFut;
+ }
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
+
+ if (tx != null) {
+ IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
+
+ // Only for error logging.
+ rollbackFut.listenAsync(CU.errorLogger(log));
+
+ return rollbackFut;
+ }
+
+ return new GridFinishedFuture<>(ctx.kernalContext(), e);
+ }
+ }
+
+ /**
+ * @param commit Commit flag (rollback if {@code false}).
+ * @param tx Transaction to commit.
+ * @return Future.
+ */
+ public IgniteFuture<IgniteTx> finishColocatedLocal(boolean commit, GridNearTxLocal<K, V> tx) {
+ try {
+ if (commit) {
+ if (!tx.markFinalizing(USER_FINISH)) {
+ if (log.isDebugEnabled())
+ log.debug("Will not finish transaction (it is handled by another thread): " + tx);
+
+ return null;
+ }
+
+ return tx.commitAsyncLocal();
+ }
+ else
+ return tx.rollbackAsyncLocal();
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e);
+
+ if (tx != null)
+ return tx.rollbackAsync();
+
+ return new GridFinishedFuture<>(ctx.kernalContext(), e);
+ }
+ }
+
+ /**
+ * @param nodeId Sender node ID.
+ * @param req Request.
+ */
+ protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest<K, V> req) {
+ assert nodeId != null;
+ assert req != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() +
+ ", nodeId=" + nodeId + ", req=" + req + ']');
+
+ GridDhtTxRemote<K, V> dhtTx = null;
+ GridNearTxRemote<K, V> nearTx = null;
+
+ GridDhtTxPrepareResponse<K, V> res;
+
+ try {
+ res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId());
+
+ // Start near transaction first.
+ nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
+ dhtTx = startRemoteTx(nodeId, req, res);
+
+ // Set evicted keys from near transaction.
+ if (nearTx != null)
+ res.nearEvicted(nearTx.evicted());
+
+ if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
+ res.invalidPartitions(dhtTx.invalidPartitions());
+ }
+ catch (IgniteCheckedException e) {
+ if (e instanceof IgniteTxRollbackException)
+ U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e);
+ else if (e instanceof IgniteTxOptimisticException) {
+ if (log.isDebugEnabled())
+ log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx);
+ }
+ else
+ U.error(log, "Failed to process prepare request: " + req, e);
+
+ if (nearTx != null)
+ nearTx.rollback();
+
+ if (dhtTx != null)
+ dhtTx.rollback();
+
+ res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId(), e);
+ }
+
+ try {
+ // Reply back to sender.
+ ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyException) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send tx response to remote node (node left grid) [node=" + nodeId +
+ ", xid=" + req.version());
+ }
+ else
+ U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [node=" + nodeId +
+ ", xid=" + req.version() + ", err=" + e.getMessage() + ']');
+
+ if (nearTx != null)
+ nearTx.rollback();
+
+ if (dhtTx != null)
+ dhtTx.rollback();
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ @SuppressWarnings({"unchecked"})
+ protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest<K, V> req) {
+ assert nodeId != null;
+ assert req != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
+
+ GridDhtTxRemote<K, V> dhtTx = ctx.tm().tx(req.version());
+ GridNearTxRemote<K, V> nearTx = ctx.tm().nearTx(req.version());
+
+ try {
+ if (dhtTx == null && !F.isEmpty(req.writes()))
+ dhtTx = startRemoteTxForFinish(nodeId, req);
+
+ if (dhtTx != null) {
+ dhtTx.syncCommit(req.syncCommit());
+ dhtTx.syncRollback(req.syncRollback());
+ }
+
+ // One-phase commit transactions send finish requests to backup nodes.
+ if (dhtTx != null && req.onePhaseCommit()) {
+ dhtTx.onePhaseCommit(true);
+
+ dhtTx.writeVersion(req.writeVersion());
+ }
+
+ if (nearTx == null && !F.isEmpty(req.nearWrites()) && req.groupLock())
+ nearTx = startNearRemoteTxForFinish(nodeId, req);
+
+ if (nearTx != null) {
+ nearTx.syncCommit(req.syncCommit());
+ nearTx.syncRollback(req.syncRollback());
+ }
+ }
+ catch (IgniteTxRollbackException e) {
+ if (log.isDebugEnabled())
+ log.debug("Received finish request for completed transaction (will ignore) [req=" + req + ", err=" +
+ e.getMessage() + ']');
+
+ sendReply(nodeId, req);
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to start remote DHT and Near transactions (will invalidate transactions) [dhtTx=" +
+ dhtTx + ", nearTx=" + nearTx + ']', e);
+
+ if (dhtTx != null)
+ dhtTx.invalidate(true);
+
+ if (nearTx != null)
+ nearTx.invalidate(true);
+ }
+ catch (GridDistributedLockCancelledException ignore) {
+ U.warn(log, "Received commit request to cancelled lock (will invalidate transaction) [dhtTx=" +
+ dhtTx + ", nearTx=" + nearTx + ']');
+
+ if (dhtTx != null)
+ dhtTx.invalidate(true);
+
+ if (nearTx != null)
+ nearTx.invalidate(true);
+ }
+
+ // Safety - local transaction will finish explicitly.
+ if (nearTx != null && nearTx.local())
+ nearTx = null;
+
- finish(nodeId, dhtTx, req, req.writes());
++ finish(nodeId, dhtTx, req, req.writes(), req.ttls());
+
+ if (nearTx != null)
- finish(nodeId, nearTx, req, req.nearWrites());
++ finish(nodeId, nearTx, req, req.nearWrites(), req.nearTtls());
+
+ sendReply(nodeId, req);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param tx Transaction.
+ * @param req Request.
+ * @param writes Writes.
++ * @param ttls TTLs for optimistic transaction.
+ */
+ protected void finish(
+ UUID nodeId,
+ IgniteTxRemoteEx<K, V> tx,
+ GridDhtTxFinishRequest<K, V> req,
- Collection<IgniteTxEntry<K, V>> writes) {
++ Collection<IgniteTxEntry<K, V>> writes,
++ @Nullable GridLongList ttls) {
+ // We don't allow explicit locks for transactions and
+ // therefore immediately return if transaction is null.
+ // However, we may decide to relax this restriction in
+ // future.
+ if (tx == null) {
+ if (req.commit())
+ // Must be some long time duplicate, but we add it anyway.
+ ctx.tm().addCommittedTx(req.version(), null);
+ else
+ ctx.tm().addRolledbackTx(req.version());
+
+ if (log.isDebugEnabled())
+ log.debug("Received finish request for non-existing transaction (added to completed set) " +
+ "[senderNodeId=" + nodeId + ", res=" + req + ']');
+
+ return;
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req +
+ ", tx=" + tx + ']');
+
++ assert ttls == null || tx.concurrency() == OPTIMISTIC;
++
+ try {
+ if (req.commit() || req.isSystemInvalidate()) {
+ if (tx.commitVersion(req.commitVersion())) {
+ tx.invalidate(req.isInvalidate());
+ tx.systemInvalidate(req.isSystemInvalidate());
+
+ if (!F.isEmpty(writes)) {
+ // In OPTIMISTIC mode, we get the values at PREPARE stage.
+ assert tx.concurrency() == PESSIMISTIC;
+
+ for (IgniteTxEntry<K, V> entry : writes) {
+ if (log.isDebugEnabled())
+ log.debug("Unmarshalled transaction entry from pessimistic transaction [key=" +
+ entry.key() + ", value=" + entry.value() + ", tx=" + tx + ']');
+
+ if (!tx.setWriteValue(entry))
+ U.warn(log, "Received entry to commit that was not present in transaction [entry=" +
+ entry + ", tx=" + tx + ']');
+ }
+ }
++ else if (tx.concurrency() == OPTIMISTIC && ttls != null) {
++ int idx = 0;
++
++ for (GridCacheTxEntry<K, V> e : tx.writeEntries())
++ e.ttl(ttls.get(idx));
++ }
+
+ // Complete remote candidates.
+ tx.doneRemote(req.baseVersion(), null, null, null);
+
+ if (tx.pessimistic())
+ tx.prepare();
+
+ tx.commit();
+ }
+ }
+ else {
+ assert tx != null;
+
+ tx.doneRemote(req.baseVersion(), null, null, null);
+
+ tx.rollback();
+ }
+ }
+ catch (Throwable e) {
+ U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
+
+ // Mark transaction for invalidate.
+ tx.invalidate(true);
+ tx.systemInvalidate(true);
+
+ try {
+ tx.commit();
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to invalidate transaction: " + tx, ex);
+ }
+ }
+ }
+
+ /**
+ * Sends tx finish response to remote node, if response is requested.
+ *
+ * @param nodeId Node id that originated finish request.
+ * @param req Request.
+ */
+ protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req) {
+ if (req.replyRequired()) {
+ GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
+
+ try {
+ ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ }
+ catch (Throwable e) {
+ // Double-check.
+ if (ctx.discovery().node(nodeId) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
+ }
+ else
+ U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ * @param res Response.
+ * @return Remote transaction.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId,
+ GridDhtTxPrepareRequest<K, V> req,
+ GridDhtTxPrepareResponse<K, V> res) throws IgniteCheckedException {
+ if (!F.isEmpty(req.writes())) {
+ GridDhtTxRemote<K, V> tx = ctx.tm().tx(req.version());
+
+ assert F.isEmpty(req.candidatesByKey());
+
+ if (tx == null) {
+ tx = new GridDhtTxRemote<>(
+ ctx,
+ req.nearNodeId(),
+ req.futureId(),
+ nodeId,
+ req.threadId(),
+ req.topologyVersion(),
+ req.version(),
+ req.commitVersion(),
+ req.system(),
+ req.concurrency(),
+ req.isolation(),
+ req.isInvalidate(),
+ req.timeout(),
+ req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
+ req.groupLockKey(),
+ req.nearXidVersion(),
+ req.transactionNodes(),
+ req.subjectId(),
+ req.taskNameHash());
+
+ tx = ctx.tm().onCreated(tx);
+
+ if (tx == null || !ctx.tm().onStarted(tx)) {
+ if (log.isDebugEnabled())
+ log.debug("Attempt to start a completed transaction (will ignore): " + tx);
+
+ return null;
+ }
+ }
+
+ if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {
+ int idx = 0;
+
+ for (IgniteTxEntry<K, V> entry : req.writes()) {
+ GridCacheContext<K, V> cacheCtx = entry.context();
+
+ tx.addWrite(entry, ctx.deploy().globalLoader());
+
+ if (isNearEnabled(cacheCtx) && req.invalidateNearEntry(idx))
+ invalidateNearEntry(cacheCtx, entry.key(), req.version());
+
+ try {
+ if (req.needPreloadKey(idx)) {
+ GridCacheEntryEx<K, V> cached = entry.cached();
+
+ if (cached == null)
+ cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
+
+ GridCacheEntryInfo<K, V> info = cached.info();
+
+ if (info != null && !info.isNew() && !info.isDeleted())
+ res.addPreloadEntry(info);
+ }
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ tx.addInvalidPartition(cacheCtx, e.partition());
+
+ tx.clearEntry(entry.txKey());
+ }
+
+ idx++;
+ }
+ }
+
+ // Prepare prior to reordering, so the pending locks added
+ // in prepare phase will get properly ordered as well.
+ tx.prepare();
+
+ if (req.last())
+ tx.state(PREPARED);
+
+ res.invalidPartitions(tx.invalidPartitions());
+
+ if (tx.empty() && req.last()) {
+ tx.rollback();
+
+ return null;
+ }
+
+ return tx;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param key Key
+ * @param ver Version.
+ * @throws IgniteCheckedException If invalidate failed.
+ */
+ private void invalidateNearEntry(GridCacheContext<K, V> cacheCtx, K key, GridCacheVersion ver)
+ throws IgniteCheckedException {
+ GridNearCacheAdapter<K, V> near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
+
+ GridCacheEntryEx<K, V> nearEntry = near.peekEx(key);
+
+ if (nearEntry != null)
+ nearEntry.invalidate(null, ver);
+ }
+
+ /**
+ * Called while processing dht tx prepare request.
+ *
+ * @param ldr Loader.
+ * @param nodeId Sender node ID.
+ * @param req Request.
+ * @return Remote transaction.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public GridNearTxRemote<K, V> startNearRemoteTx(ClassLoader ldr, UUID nodeId,
+ GridDhtTxPrepareRequest<K, V> req) throws IgniteCheckedException {
+ assert F.isEmpty(req.candidatesByKey());
+
+ if (!F.isEmpty(req.nearWrites())) {
+ GridNearTxRemote<K, V> tx = ctx.tm().nearTx(req.version());
+
+ if (tx == null) {
+ tx = new GridNearTxRemote<>(
+ ctx,
+ ldr,
+ nodeId,
+ req.nearNodeId(),
+ req.threadId(),
+ req.version(),
+ req.commitVersion(),
+ req.system(),
+ req.concurrency(),
+ req.isolation(),
+ req.isInvalidate(),
+ req.timeout(),
+ req.nearWrites(),
+ req.txSize(),
+ req.groupLockKey(),
+ req.subjectId(),
+ req.taskNameHash()
+ );
+
+ if (!tx.empty()) {
+ tx = ctx.tm().onCreated(tx);
+
+ if (tx == null || !ctx.tm().onStarted(tx))
+ throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + tx);
+ }
+ }
+ else
+ tx.addEntries(ldr, req.nearWrites());
+
+ tx.ownedVersions(req.owned());
+
+ // Prepare prior to reordering, so the pending locks added
+ // in prepare phase will get properly ordered as well.
+ tx.prepare();
+
+ return tx;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param nodeId Primary node ID.
+ * @param req Request.
+ * @return Remote transaction.
+ * @throws IgniteCheckedException If failed.
+ * @throws GridDistributedLockCancelledException If lock has been cancelled.
+ */
+ @SuppressWarnings({"RedundantTypeArguments"})
+ @Nullable GridDhtTxRemote<K, V> startRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
+ throws IgniteCheckedException, GridDistributedLockCancelledException {
+
+ GridDhtTxRemote<K, V> tx = null;
+
+ boolean marked = false;
+
+ for (IgniteTxEntry<K, V> txEntry : req.writes()) {
+ GridDistributedCacheEntry<K, V> entry = null;
+
+ GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+ while (true) {
+ try {
+ int part = cacheCtx.affinity().partition(txEntry.key());
+
+ GridDhtLocalPartition<K, V> locPart = cacheCtx.topology().localPartition(part,
+ req.topologyVersion(), false);
+
+ // Handle implicit locks for pessimistic transactions.
+ if (tx == null)
+ tx = ctx.tm().tx(req.version());
+
+ if (locPart == null || !locPart.reserve()) {
+ if (log.isDebugEnabled())
+ log.debug("Local partition for given key is already evicted (will remove from tx) " +
+ "[key=" + txEntry.key() + ", part=" + part + ", locPart=" + locPart + ']');
+
+ if (tx != null)
+ tx.clearEntry(txEntry.txKey());
+
+ break;
+ }
+
+ try {
+ entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key(),
+ req.topologyVersion());
+
+ if (tx == null) {
+ tx = new GridDhtTxRemote<>(
+ ctx,
+ req.nearNodeId(),
+ req.futureId(),
+ nodeId,
+ // We can pass null as nearXidVersion as transaction will be committed right away.
+ null,
+ req.threadId(),
+ req.topologyVersion(),
+ req.version(),
+ /*commitVer*/null,
+ req.system(),
+ PESSIMISTIC,
+ req.isolation(),
+ req.isInvalidate(),
+ 0,
+ req.txSize(),
+ req.groupLockKey(),
+ req.subjectId(),
+ req.taskNameHash());
+
+ tx = ctx.tm().onCreated(tx);
+
+ if (tx == null || !ctx.tm().onStarted(tx))
+ throw new IgniteTxRollbackException("Failed to acquire lock " +
+ "(transaction has been completed): " + req.version());
+ }
+
+ tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(),
+ txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion());
+
+ if (!marked) {
+ if (tx.markFinalizing(USER_FINISH))
+ marked = true;
+ else {
+ tx.clearEntry(txEntry.txKey());
+
+ return null;
+ }
+ }
+
+ // Add remote candidate before reordering.
+ if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
+ entry.addRemote(
+ req.nearNodeId(),
+ nodeId,
+ req.threadId(),
+ req.version(),
+ 0,
+ /*tx*/true,
+ tx.implicitSingle(),
+ null
+ );
+
+ // Double-check in case if sender node left the grid.
+ if (ctx.discovery().node(req.nearNodeId()) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
+
+ tx.rollback();
+
+ return null;
+ }
+
+ // Entry is legit.
+ break;
+ }
+ finally {
+ locPart.release();
+ }
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
+ entry;
+
+ if (log.isDebugEnabled())
+ log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
+
+ tx.clearEntry(txEntry.txKey());
+
+ if (log.isDebugEnabled())
+ log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
+ entry + ", tx=" + tx + ']');
+ }
+ catch (GridDhtInvalidPartitionException p) {
+ if (log.isDebugEnabled())
+ log.debug("Received invalid partition (will clear entry from tx) [part=" + p + ", req=" +
+ req + ", txEntry=" + txEntry + ']');
+
+ if (tx != null)
+ tx.clearEntry(txEntry.txKey());
+
+ break;
+ }
+ }
+ }
+
+ if (tx != null && tx.empty()) {
+ tx.rollback();
+
+ return null;
+ }
+
+ return tx;
+ }
+
+ /**
+ * @param nodeId Primary node ID.
+ * @param req Request.
+ * @return Remote transaction.
+ * @throws IgniteCheckedException If failed.
+ * @throws GridDistributedLockCancelledException If lock has been cancelled.
+ */
+ @SuppressWarnings({"RedundantTypeArguments"})
+ @Nullable public GridNearTxRemote<K, V> startNearRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
+ throws IgniteCheckedException, GridDistributedLockCancelledException {
+ assert req.groupLock();
+
+ GridNearTxRemote<K, V> tx = null;
+
+ ClassLoader ldr = ctx.deploy().globalLoader();
+
+ if (ldr != null) {
+ boolean marked = false;
+
+ for (IgniteTxEntry<K, V> txEntry : req.nearWrites()) {
+ GridDistributedCacheEntry<K, V> entry = null;
+
+ GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+ while (true) {
+ try {
+ entry = cacheCtx.near().peekExx(txEntry.key());
+
+ if (entry != null) {
+ entry.keyBytes(txEntry.keyBytes());
+
+ // Handle implicit locks for pessimistic transactions.
+ if (tx == null)
+ tx = ctx.tm().nearTx(req.version());
+
+ if (tx == null) {
+ tx = new GridNearTxRemote<>(
+ ctx,
+ nodeId,
+ req.nearNodeId(),
+ // We can pass null as nearXidVer as transaction will be committed right away.
+ null,
+ req.threadId(),
+ req.version(),
+ null,
+ req.system(),
+ PESSIMISTIC,
+ req.isolation(),
+ req.isInvalidate(),
+ 0,
+ req.txSize(),
+ req.groupLockKey(),
+ req.subjectId(),
+ req.taskNameHash());
+
+ tx = ctx.tm().onCreated(tx);
+
+ if (tx == null || !ctx.tm().onStarted(tx))
+ throw new IgniteTxRollbackException("Failed to acquire lock " +
+ "(transaction has been completed): " + req.version());
+
+ if (!marked)
+ marked = tx.markFinalizing(USER_FINISH);
+
+ if (!marked)
+ return null;
+ }
+
+ if (tx.local())
+ return null;
+
+ if (!marked)
+ marked = tx.markFinalizing(USER_FINISH);
+
+ if (marked)
+ tx.addEntry(cacheCtx, txEntry.txKey(), txEntry.keyBytes(), txEntry.op(), txEntry.value(),
+ txEntry.valueBytes(), txEntry.drVersion());
+ else
+ return null;
+
+ if (req.groupLock()) {
+ tx.markGroupLock();
+
+ if (!txEntry.groupLockEntry())
+ tx.groupLockKey(txEntry.txKey());
+ }
+
+ // Add remote candidate before reordering.
+ if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
+ entry.addRemote(
+ req.nearNodeId(),
+ nodeId,
+ req.threadId(),
+ req.version(),
+ 0,
+ /*tx*/true,
+ tx.implicitSingle(),
+ null
+ );
+ }
+
+ // Double-check in case if sender node left the grid.
+ if (ctx.discovery().node(req.nearNodeId()) == null) {
+ if (log.isDebugEnabled())
+ log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
+
+ if (tx != null)
+ tx.rollback();
+
+ return null;
+ }
+
+ // Entry is legit.
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
+ entry;
+
+ if (log.isDebugEnabled())
+ log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
+
+ if (tx != null) {
+ tx.clearEntry(txEntry.txKey());
+
+ if (log.isDebugEnabled())
+ log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
+ entry + ", tx=" + tx + ']');
+ }
+
+ // Will retry in while loop.
+ }
+ }
+ }
+ }
+ else {
+ String err = "Failed to acquire deployment class loader for message: " + req;
+
+ U.warn(log, err);
+
+ throw new IgniteCheckedException(err);
+ }
+
+ return tx;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
+ if (log.isDebugEnabled())
+ log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
+
+ boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+
+ GridCacheOptimisticCheckPreparedTxResponse<K, V> res =
+ new GridCacheOptimisticCheckPreparedTxResponse<>(req.version(), req.futureId(), req.miniId(), prepared);
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+ ctx.io().send(nodeId, res);
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" +
+ nodeId + ", res=" + res + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
+ if (log.isDebugEnabled())
+ log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+ GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = (GridCacheOptimisticCheckPreparedTxFuture<K, V>)ctx.mvcc().
+ <Boolean>future(res.version(), res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ protected void processCheckCommittedTxRequest(final UUID nodeId,
+ final GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
+ if (log.isDebugEnabled())
+ log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']');
+
+ IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req);
+
+ infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() {
+ @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) {
+ GridCacheCommittedTxInfo<K, V> info = null;
+
+ try {
+ info = infoFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to obtain committed info for transaction (will rollback): " + req, e);
+ }
+
+ GridCachePessimisticCheckCommittedTxResponse<K, V>
+ res = new GridCachePessimisticCheckCommittedTxResponse<>(
+ req.version(), req.futureId(), req.miniId(), info);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished waiting for tx committed info [req=" + req + ", res=" + res + ']');
+
+ sendCheckCommittedResponse(nodeId, res); }
+ });
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ protected void processCheckCommittedTxResponse(UUID nodeId,
+ GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+ if (log.isDebugEnabled())
+ log.debug("Processing check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+ GridCachePessimisticCheckCommittedTxFuture<K, V> fut =
+ (GridCachePessimisticCheckCommittedTxFuture<K, V>)ctx.mvcc().<GridCacheCommittedTxInfo<K, V>>future(
+ res.version(), res.futureId());
+
+ if (fut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received response for unknown future (will ignore): " + res);
+
+ return;
+ }
+
+ fut.onResult(nodeId, res);
+ }
+
+ /**
+ * Sends check committed response to remote node.
+ *
+ * @param nodeId Node ID to send to.
+ * @param res Reponse to send.
+ */
+ private void sendCheckCommittedResponse(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
+
+ ctx.io().send(nodeId, res);
+ }
+ catch (ClusterTopologyException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" +
+ nodeId + ", res=" + res + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+ }
+ }
+ }