You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/22 00:04:35 UTC
[42/46] incubator-ignite git commit: GG-9141 - Renaming.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
deleted file mode 100644
index 2861d66..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.processors.timeout.*;
-import org.gridgain.grid.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Transaction managed by cache ({@code 'Ex'} stands for external).
- */
-public interface GridCacheTxEx<K, V> extends IgniteTx, GridTimeoutObject {
- @SuppressWarnings("PublicInnerClass")
- public enum FinalizationStatus {
- /** Transaction was not finalized yet. */
- NONE,
-
- /** Transaction is being finalized by user. */
- USER_FINISH,
-
- /** Recovery request is received, user finish requests should be ignored. */
- RECOVERY_WAIT,
-
- /** Transaction is being finalized by recovery procedure. */
- RECOVERY_FINISH
- }
-
- /**
- * @return Size of the transaction.
- */
- public int size();
-
- /**
- * @return {@code True} if transaction is allowed to use store.
- */
- public boolean storeEnabled();
-
- /**
- * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with
- * store enabled.
- */
- public boolean storeUsed();
-
- /**
- * Checks if this is system cache transaction. System transactions are isolated from user transactions
- * because some of the public API methods may be invoked inside user transactions and internally start
- * system cache transactions.
- *
- * @return {@code True} if transaction is started for system cache.
- */
- public boolean system();
-
- /**
- * @return Last recorded topology version.
- */
- public long topologyVersion();
-
- /**
- * @return Flag indicating whether transaction is implicit with only one key.
- */
- public boolean implicitSingle();
-
- /**
- * @return Collection of cache IDs involved in this transaction.
- */
- public Collection<Integer> activeCacheIds();
-
- /**
- * Attempts to set topology version and returns the current value.
- * If topology version was previously set, then it's value will
- * be returned (but not updated).
- *
- * @param topVer Topology version.
- * @return Recorded topology version.
- */
- public long topologyVersion(long topVer);
-
- /**
- * @return {@code True} if transaction is empty.
- */
- public boolean empty();
-
- /**
- * @return {@code True} if transaction group-locked.
- */
- public boolean groupLock();
-
- /**
- * @return Group lock key if {@link #groupLock()} is {@code true}.
- */
- @Nullable public GridCacheTxKey groupLockKey();
-
- /**
- * @return {@code True} if preparing flag was set with this call.
- */
- public boolean markPreparing();
-
- /**
- * @param status Finalization status to set.
- * @return {@code True} if could mark was set.
- */
- public boolean markFinalizing(FinalizationStatus status);
-
- /**
- * @param part Invalid partition.
- */
- public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part);
-
- /**
- * @return Invalid partitions.
- */
- public Set<Integer> invalidPartitions();
-
- /**
- * Gets owned version for near remote transaction.
- *
- * @param key Key to get version for.
- * @return Owned version, if any.
- */
- @Nullable public GridCacheVersion ownedVersion(GridCacheTxKey<K> key);
-
- /**
- * Gets ID of additional node involved. For example, in DHT case, other node is
- * near node ID.
- *
- * @return Parent node IDs.
- */
- @Nullable public UUID otherNodeId();
-
- /**
- * @return Event node ID.
- */
- public UUID eventNodeId();
-
- /**
- * Gets node ID which directly started this transaction. In case of DHT local transaction it will be
- * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote
- * transaction it will be starter node ID.
- *
- * @return Originating node ID.
- */
- public UUID originatingNodeId();
-
- /**
- * @return Master node IDs.
- */
- public Collection<UUID> masterNodeIds();
-
- /**
- * @return Near transaction ID.
- */
- @Nullable public GridCacheVersion nearXidVersion();
-
- /**
- * @return Transaction nodes mapping (primary node -> related backup nodes).
- */
- @Nullable public Map<UUID, Collection<UUID>> transactionNodes();
-
- /**
- * @param entry Entry to check.
- * @return {@code True} if lock is owned.
- * @throws GridCacheEntryRemovedException If entry has been removed.
- */
- public boolean ownsLock(GridCacheEntryEx<K, V> entry) throws GridCacheEntryRemovedException;
-
- /**
- * @param entry Entry to check.
- * @return {@code True} if lock is owned.
- */
- public boolean ownsLockUnsafe(GridCacheEntryEx<K, V> entry);
-
- /**
- * For Partitioned caches, this flag is {@code false} for remote DHT and remote NEAR
- * transactions because serializability of transaction is enforced on primary node. All
- * other transaction types must enforce it.
- *
- * @return Enforce serializable flag.
- */
- public boolean enforceSerializable();
-
- /**
- * @return {@code True} if near transaction.
- */
- public boolean near();
-
- /**
- * @return {@code True} if DHT transaction.
- */
- public boolean dht();
-
- /**
- * @return {@code True} if dht colocated transaction.
- */
- public boolean colocated();
-
- /**
- * @return {@code True} if transaction is local, {@code false} if it's remote.
- */
- public boolean local();
-
- /**
- * @return {@code True} if transaction is replicated.
- */
- public boolean replicated();
-
- /**
- * @return Subject ID initiated this transaction.
- */
- public UUID subjectId();
-
- /**
- * Task name hash in case if transaction was initiated within task execution.
- *
- * @return Task name hash.
- */
- public int taskNameHash();
-
- /**
- * @return {@code True} if transaction is user transaction, which means:
- * <ul>
- * <li>Explicit</li>
- * <li>Local</li>
- * <li>Not DHT</li>
- * </ul>
- */
- public boolean user();
-
- /**
- * @return {@code True} if transaction is configured with synchronous commit flag.
- */
- public boolean syncCommit();
-
- /**
- * @return {@code True} if transaction is configured with synchronous rollback flag.
- */
- public boolean syncRollback();
-
- /**
- * @param key Key to check.
- * @return {@code True} if key is present.
- */
- public boolean hasWriteKey(GridCacheTxKey<K> key);
-
- /**
- * @return Read set.
- */
- public Set<GridCacheTxKey<K>> readSet();
-
- /**
- * @return Write set.
- */
- public Set<GridCacheTxKey<K>> writeSet();
-
- /**
- * @return All transaction entries.
- */
- public Collection<GridCacheTxEntry<K, V>> allEntries();
-
- /**
- * @return Write entries.
- */
- public Collection<GridCacheTxEntry<K, V>> writeEntries();
-
- /**
- * @return Read entries.
- */
- public Collection<GridCacheTxEntry<K, V>> readEntries();
-
- /**
- * @return Transaction write map.
- */
- public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> writeMap();
-
- /**
- * @return Transaction read map.
- */
- public Map<GridCacheTxKey<K>, GridCacheTxEntry<K, V>> readMap();
-
- /**
- * Gets pessimistic recovery writes, i.e. values that have never been sent to remote nodes with lock requests.
- *
- * @return Collection of recovery writes.
- */
- public Collection<GridCacheTxEntry<K, V>> recoveryWrites();
-
- /**
- * Gets a list of entries that needs to be locked on the next step of prepare stage of
- * optimistic transaction.
- *
- * @return List of tx entries for optimistic locking.
- */
- public Collection<GridCacheTxEntry<K, V>> optimisticLockEntries();
-
- /**
- * Seals transaction for updates.
- */
- public void seal();
-
- /**
- * @param key Key for the entry.
- * @return Entry for the key (either from write set or read set).
- */
- @Nullable public GridCacheTxEntry<K, V> entry(GridCacheTxKey<K> key);
-
- /**
- * @param failFast Fail-fast flag.
- * @param key Key to look up.
- * @param filter Filter to check.
- * @return Current value for the key within transaction.
- * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}.
- */
- @Nullable public GridTuple<V> peek(
- GridCacheContext<K, V> ctx,
- boolean failFast,
- K key,
- @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws GridCacheFilterFailedException;
-
- /**
- * @return Start version.
- */
- public GridCacheVersion startVersion();
-
- /**
- * @return Transaction version.
- */
- public GridCacheVersion xidVersion();
-
- /**
- * @return Version created at commit time.
- */
- public GridCacheVersion commitVersion();
-
- /**
- * @param commitVer Commit version.
- * @return {@code True} if version was set.
- */
- public boolean commitVersion(GridCacheVersion commitVer);
-
- /**
- * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
- * assigned to this transaction at the end of write phase.
- */
- public GridCacheVersion endVersion();
-
- /**
- * Prepare state.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void prepare() throws IgniteCheckedException;
-
- /**
- * Prepare stage.
- *
- * @return Future for prepare step.
- */
- public IgniteFuture<GridCacheTxEx<K, V>> prepareAsync();
-
- /**
- * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
- * assigned to this transaction at the end of write phase.
- */
- public void endVersion(GridCacheVersion endVer);
-
- /**
- * @return Transaction write version. For all transactions except DHT transactions, will be equal to
- * {@link #xidVersion()}.
- */
- public GridCacheVersion writeVersion();
-
- /**
- * Sets write version.
- *
- * @param ver Write version.
- */
- public void writeVersion(GridCacheVersion ver);
-
- /**
- * @return Future for transaction completion.
- */
- public IgniteFuture<IgniteTx> finishFuture();
-
- /**
- * @param state Transaction state.
- * @return {@code True} if transition was valid, {@code false} otherwise.
- */
- public boolean state(IgniteTxState state);
-
- /**
- * @param invalidate Invalidate flag.
- */
- public void invalidate(boolean invalidate);
-
- /**
- * @param sysInvalidate System invalidate flag.
- */
- public void systemInvalidate(boolean sysInvalidate);
-
- /**
- * @return System invalidate flag.
- */
- public boolean isSystemInvalidate();
-
- /**
- * TODO-gg-4004 Put rollback async on public API?
- * Asynchronously rollback this transaction.
- *
- * @return Rollback future.
- */
- public IgniteFuture<IgniteTx> rollbackAsync();
-
- /**
- * Callback invoked whenever there is a lock that has been acquired
- * by this transaction for any of the participating entries.
- *
- * @param entry Cache entry.
- * @param owner Lock candidate that won ownership of the lock.
- * @return {@code True} if transaction cared about notification.
- */
- public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner);
-
- /**
- * @return {@code True} if transaction timed out.
- */
- public boolean timedOut();
-
- /**
- * @return {@code True} if transaction had completed successfully or unsuccessfully.
- */
- public boolean done();
-
- /**
- * @return {@code True} for OPTIMISTIC transactions.
- */
- public boolean optimistic();
-
- /**
- * @return {@code True} for PESSIMISTIC transactions.
- */
- public boolean pessimistic();
-
- /**
- * @return {@code True} if read-committed.
- */
- public boolean readCommitted();
-
- /**
- * @return {@code True} if repeatable-read.
- */
- public boolean repeatableRead();
-
- /**
- * @return {@code True} if serializable.
- */
- public boolean serializable();
-
- /**
- * Checks whether given key has been removed within transaction.
- *
- * @param key Key to check.
- * @return {@code True} if key has been removed.
- */
- public boolean removed(GridCacheTxKey<K> key);
-
- /**
- * Gets allowed remaining time for this transaction.
- *
- * @return Remaining time.
- * @throws IgniteTxTimeoutException If transaction timed out.
- */
- public long remainingTime() throws IgniteTxTimeoutException;
-
- /**
- * @return Alternate transaction versions.
- */
- public Collection<GridCacheVersion> alternateVersions();
-
- /**
- * @return {@code True} if transaction needs completed versions for processing.
- */
- public boolean needsCompletedVersions();
-
- /**
- * @param base Base for committed versions.
- * @param committed Committed transactions relative to base.
- * @param rolledback Rolled back transactions relative to base.
- */
- public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed,
- Collection<GridCacheVersion> rolledback);
-
- /**
- * @return {@code True} if transaction has at least one internal entry.
- */
- public boolean internal();
-
- /**
- * @return {@code True} if transaction is a one-phase-commit transaction.
- */
- public boolean onePhaseCommit();
-
- /**
- * @return {@code True} if transaction has transform entries. This flag will be only set for local
- * transactions.
- */
- public boolean hasTransforms();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
deleted file mode 100644
index 47631e0..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
+++ /dev/null
@@ -1,1492 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
-import org.gridgain.grid.util.future.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
-import static org.apache.ignite.transactions.IgniteTxIsolation.*;
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
-
-/**
- * Isolated logic to process cache messages.
- */
-public class GridCacheTxHandler<K, V> {
- /** Logger. */
- private IgniteLogger log;
-
- /** Shared cache context. */
- private GridCacheSharedContext<K, V> ctx;
-
- public IgniteFuture<GridCacheTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId,
- final GridNearTxPrepareRequest<K, V> req) {
- return prepareTx(nearNodeId, null, req);
- }
-
- /**
- * @param ctx Shared cache context.
- */
- public GridCacheTxHandler(GridCacheSharedContext<K, V> ctx) {
- this.ctx = ctx;
-
- log = ctx.logger(GridCacheTxHandler.class);
-
- ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheMessage<K, V> msg) {
- processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse<K, V>)msg);
- }
- });
-
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxRequest.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxRequest<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
- processCheckPreparedTxRequest(nodeId, req);
- }
- });
-
- ctx.io().addHandler(0, GridCacheOptimisticCheckPreparedTxResponse.class,
- new CI2<UUID, GridCacheOptimisticCheckPreparedTxResponse<K, V>>() {
- @Override public void apply(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
- processCheckPreparedTxResponse(nodeId, res);
- }
- });
-
- ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxRequest.class,
- new CI2<UUID, GridCachePessimisticCheckCommittedTxRequest<K, V>>() {
- @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
- processCheckCommittedTxRequest(nodeId, req);
- }
- });
-
- ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxResponse.class,
- new CI2<UUID, GridCachePessimisticCheckCommittedTxResponse<K, V>>() {
- @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
- processCheckCommittedTxResponse(nodeId, res);
- }
- });
- }
-
- /**
- * @param nearNodeId Near node ID that initiated transaction.
- * @param locTx Optional local transaction.
- * @param req Near prepare request.
- * @return Future for transaction.
- */
- public IgniteFuture<GridCacheTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx,
- final GridNearTxPrepareRequest<K, V> req) {
- assert nearNodeId != null;
- assert req != null;
-
- if (locTx != null) {
- if (req.near()) {
- // Make sure not to provide Near entries to DHT cache.
- req.cloneEntries();
-
- return prepareNearTx(nearNodeId, req);
- }
- else
- return prepareColocatedTx(locTx, req);
- }
- else
- return prepareNearTx(nearNodeId, req);
- }
-
- /**
- * Prepares local colocated tx.
- *
- * @param locTx Local transaction.
- * @param req Near prepare request.
- * @return Prepare future.
- */
- private IgniteFuture<GridCacheTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx,
- final GridNearTxPrepareRequest<K, V> req) {
-
- IgniteFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys.
-
- return new GridEmbeddedFuture<>(
- ctx.kernalContext(),
- fut,
- new C2<Object, Exception, IgniteFuture<GridCacheTxEx<K, V>>>() {
- @Override public IgniteFuture<GridCacheTxEx<K, V>> apply(Object o, Exception ex) {
- if (ex != null)
- throw new GridClosureException(ex);
-
- IgniteFuture<GridCacheTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(),
- req.transactionNodes(), req.last(), req.lastBackups());
-
- if (locTx.isRollbackOnly())
- locTx.rollbackAsync();
-
- return fut;
- }
- },
- new C2<GridCacheTxEx<K, V>, Exception, GridCacheTxEx<K, V>>() {
- @Nullable @Override public GridCacheTxEx<K, V> apply(GridCacheTxEx<K, V> tx, Exception e) {
- if (e != null) {
- // tx can be null of exception occurred.
- if (tx != null)
- tx.setRollbackOnly(); // Just in case.
-
- if (!(e instanceof IgniteTxOptimisticException))
- U.error(log, "Failed to prepare DHT transaction: " + tx, e);
- }
-
- return tx;
- }
- }
- );
- }
-
- /**
- * Prepares near transaction.
- *
- * @param nearNodeId Near node ID that initiated transaction.
- * @param req Near prepare request.
- * @return Prepare future.
- */
- private IgniteFuture<GridCacheTxEx<K, V>> prepareNearTx(final UUID nearNodeId,
- final GridNearTxPrepareRequest<K, V> req) {
- ClusterNode nearNode = ctx.node(nearNodeId);
-
- if (nearNode == null) {
- if (log.isDebugEnabled())
- log.debug("Received transaction request from node that left grid (will ignore): " + nearNodeId);
-
- return null;
- }
-
- try {
- for (GridCacheTxEntry<K, V> e : F.concat(false, req.reads(), req.writes()))
- e.unmarshal(ctx, false, ctx.deploy().globalLoader());
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx.kernalContext(), e);
- }
-
- GridDhtTxLocal<K, V> tx;
-
- GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
-
- if (mappedVer != null) {
- tx = ctx.tm().tx(mappedVer);
-
- if (tx == null)
- U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version()
- + ", mappedVer=" + mappedVer + ']');
- }
- else {
- tx = new GridDhtTxLocal<>(
- ctx,
- nearNode.id(),
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- /*implicit*/false,
- /*implicit-single*/false,
- req.system(),
- req.concurrency(),
- req.isolation(),
- req.timeout(),
- req.isInvalidate(),
- false,
- req.txSize(),
- req.groupLockKey(),
- req.partitionLock(),
- req.transactionNodes(),
- req.subjectId(),
- req.taskNameHash()
- );
-
- tx = ctx.tm().onCreated(tx);
-
- if (tx != null)
- tx.topologyVersion(req.topologyVersion());
- else
- U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
- tx.xid() + ", req=" + req + ']');
- }
-
- if (tx != null) {
- IgniteFuture<GridCacheTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(),
- req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(),
- req.lastBackups());
-
- if (tx.isRollbackOnly()) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to rollback transaction: " + tx, e);
- }
- }
-
- final GridDhtTxLocal<K, V> tx0 = tx;
-
- fut.listenAsync(new CI1<IgniteFuture<GridCacheTxEx<K, V>>>() {
- @Override public void apply(IgniteFuture<GridCacheTxEx<K, V>> txFut) {
- try {
- txFut.get();
- }
- catch (IgniteCheckedException e) {
- tx0.setRollbackOnly(); // Just in case.
-
- if (!(e instanceof IgniteTxOptimisticException))
- U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
- }
- }
- });
-
- return fut;
- }
- else
- return new GridFinishedFuture<>(ctx.kernalContext(), (GridCacheTxEx<K, V>)null);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V> res) {
- GridNearTxPrepareFuture<K, V> fut = (GridNearTxPrepareFuture<K, V>)ctx.mvcc()
- .<GridCacheTxEx<K, V>>future(res.version(), res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']');
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse<K, V> res) {
- ctx.tm().onFinishedRemote(nodeId, res.threadId());
-
- GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(
- res.xid(), res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find future for finish response [sender=" + nodeId + ", res=" + res + ']');
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse<K, V> res) {
- GridDhtTxPrepareFuture<K, V> fut = (GridDhtTxPrepareFuture<K, V>)ctx.mvcc().
- <GridCacheTxEx<K, V>>future(res.version(), res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Received response for unknown future (will ignore): " + res);
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processDhtTxFinishResponse(UUID nodeId, GridDhtTxFinishResponse<K, V> res) {
- assert nodeId != null;
- assert res != null;
-
- GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, V>)ctx.mvcc().<IgniteTx>future(res.xid(),
- res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Received response for unknown future (will ignore): " + res);
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- * @return Future.
- */
- @Nullable public IgniteFuture<IgniteTx> processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest<K, V> req) {
- return finish(nodeId, null, req);
- }
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- * @return Future.
- */
- @Nullable public IgniteFuture<IgniteTx> finish(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
- GridNearTxFinishRequest<K, V> req) {
- assert nodeId != null;
- assert req != null;
-
- // Transaction on local cache only.
- if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
- return new GridFinishedFutureEx<IgniteTx>(locTx);
-
- if (log.isDebugEnabled())
- log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]");
-
- IgniteFuture<IgniteTx> colocatedFinishFut = null;
-
- if (locTx != null && locTx.colocatedLocallyMapped())
- colocatedFinishFut = finishColocatedLocal(req.commit(), locTx);
-
- IgniteFuture<IgniteTx> nearFinishFut = null;
-
- if (locTx == null || locTx.nearLocallyMapped()) {
- if (locTx != null)
- req.cloneEntries();
-
- nearFinishFut = finishDhtLocal(nodeId, locTx, req);
- }
-
- if (colocatedFinishFut != null && nearFinishFut != null) {
- GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext());
-
- res.add(colocatedFinishFut);
- res.add(nearFinishFut);
-
- res.markInitialized();
-
- return res;
- }
-
- if (colocatedFinishFut != null)
- return colocatedFinishFut;
-
- return nearFinishFut;
- }
-
- /**
- * @param nodeId Node ID initiated commit.
- * @param locTx Optional local transaction.
- * @param req Finish request.
- * @return Finish future.
- */
- private IgniteFuture<IgniteTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal<K, V> locTx,
- GridNearTxFinishRequest<K, V> req) {
- GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
-
- GridDhtTxLocal<K, V> tx = null;
-
- if (dhtVer == null) {
- if (log.isDebugEnabled())
- log.debug("Received transaction finish request for unknown near version (was lock explicit?): " + req);
- }
- else
- tx = ctx.tm().tx(dhtVer);
-
- if (tx == null && !req.explicitLock()) {
- assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx;
-
- U.warn(log, "Received finish request for completed transaction (the message may be too late " +
- "and transaction could have been DGCed by now) [commit=" + req.commit() +
- ", xid=" + req.version() + ']');
-
- // Always send finish response.
- GridCacheMessage<K, V> res = new GridNearTxFinishResponse<>(req.version(), req.threadId(), req.futureId(),
- req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
-
- try {
- ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
- }
- catch (Throwable e) {
- // Double-check.
- if (ctx.discovery().node(nodeId) == null) {
- if (log.isDebugEnabled())
- log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res +
- ']');
- }
- else
- U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
- "res=" + res + ']', e);
- }
-
- return null;
- }
-
- try {
- if (req.commit()) {
- if (tx == null) {
- // Create transaction and add entries.
- tx = ctx.tm().onCreated(
- new GridDhtTxLocal<>(
- ctx,
- nodeId,
- req.version(),
- req.futureId(),
- req.miniId(),
- req.threadId(),
- true,
- false, /* we don't know, so assume false. */
- req.system(),
- PESSIMISTIC,
- READ_COMMITTED,
- /*timeout */0,
- req.isInvalidate(),
- req.storeEnabled(),
- req.txSize(),
- req.groupLockKey(),
- false,
- null,
- req.subjectId(),
- req.taskNameHash()));
-
- if (tx == null || !ctx.tm().onStarted(tx))
- throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + req);
-
- tx.topologyVersion(req.topologyVersion());
- }
-
- tx.storeEnabled(req.storeEnabled());
-
- if (!tx.markFinalizing(USER_FINISH)) {
- if (log.isDebugEnabled())
- log.debug("Will not finish transaction (it is handled by another thread): " + tx);
-
- return null;
- }
-
- if (!tx.syncCommit())
- tx.syncCommit(req.syncCommit());
-
- tx.nearFinishFutureId(req.futureId());
- tx.nearFinishMiniId(req.miniId());
- tx.recoveryWrites(req.recoveryWrites());
-
- Collection<GridCacheTxEntry<K, V>> writeEntries = req.writes();
-
- if (!F.isEmpty(writeEntries)) {
- // In OPTIMISTIC mode, we get the values at PREPARE stage.
- assert tx.concurrency() == PESSIMISTIC;
-
- for (GridCacheTxEntry<K, V> entry : writeEntries)
- tx.addEntry(req.messageId(), entry);
- }
-
- if (tx.pessimistic())
- tx.prepare();
-
- IgniteFuture<IgniteTx> commitFut = tx.commitAsync();
-
- // Only for error logging.
- commitFut.listenAsync(CU.errorLogger(log));
-
- return commitFut;
- }
- else {
- assert tx != null : "Transaction is null for near rollback request [nodeId=" +
- nodeId + ", req=" + req + "]";
-
- tx.syncRollback(req.syncRollback());
-
- tx.nearFinishFutureId(req.futureId());
- tx.nearFinishMiniId(req.miniId());
-
- IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
-
- // Only for error logging.
- rollbackFut.listenAsync(CU.errorLogger(log));
-
- return rollbackFut;
- }
- }
- catch (Throwable e) {
- U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
-
- if (tx != null) {
- IgniteFuture<IgniteTx> rollbackFut = tx.rollbackAsync();
-
- // Only for error logging.
- rollbackFut.listenAsync(CU.errorLogger(log));
-
- return rollbackFut;
- }
-
- return new GridFinishedFuture<>(ctx.kernalContext(), e);
- }
- }
-
- /**
- * @param commit Commit flag (rollback if {@code false}).
- * @param tx Transaction to commit.
- * @return Future.
- */
- public IgniteFuture<IgniteTx> finishColocatedLocal(boolean commit, GridNearTxLocal<K, V> tx) {
- try {
- if (commit) {
- if (!tx.markFinalizing(USER_FINISH)) {
- if (log.isDebugEnabled())
- log.debug("Will not finish transaction (it is handled by another thread): " + tx);
-
- return null;
- }
-
- return tx.commitAsyncLocal();
- }
- else
- return tx.rollbackAsyncLocal();
- }
- catch (Throwable e) {
- U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e);
-
- if (tx != null)
- return tx.rollbackAsync();
-
- return new GridFinishedFuture<>(ctx.kernalContext(), e);
- }
- }
-
- /**
- * @param nodeId Sender node ID.
- * @param req Request.
- */
- protected final void processDhtTxPrepareRequest(UUID nodeId, GridDhtTxPrepareRequest<K, V> req) {
- assert nodeId != null;
- assert req != null;
-
- if (log.isDebugEnabled())
- log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() +
- ", nodeId=" + nodeId + ", req=" + req + ']');
-
- GridDhtTxRemote<K, V> dhtTx = null;
- GridNearTxRemote<K, V> nearTx = null;
-
- GridDhtTxPrepareResponse<K, V> res;
-
- try {
- res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId());
-
- // Start near transaction first.
- nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
- dhtTx = startRemoteTx(nodeId, req, res);
-
- // Set evicted keys from near transaction.
- if (nearTx != null)
- res.nearEvicted(nearTx.evicted());
-
- if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
- res.invalidPartitions(dhtTx.invalidPartitions());
- }
- catch (IgniteCheckedException e) {
- if (e instanceof IgniteTxRollbackException)
- U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e);
- else if (e instanceof IgniteTxOptimisticException) {
- if (log.isDebugEnabled())
- log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx);
- }
- else
- U.error(log, "Failed to process prepare request: " + req, e);
-
- if (nearTx != null)
- nearTx.rollback();
-
- if (dhtTx != null)
- dhtTx.rollback();
-
- res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId(), e);
- }
-
- try {
- // Reply back to sender.
- ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
- }
- catch (IgniteCheckedException e) {
- if (e instanceof ClusterTopologyException) {
- if (log.isDebugEnabled())
- log.debug("Failed to send tx response to remote node (node left grid) [node=" + nodeId +
- ", xid=" + req.version());
- }
- else
- U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [node=" + nodeId +
- ", xid=" + req.version() + ", err=" + e.getMessage() + ']');
-
- if (nearTx != null)
- nearTx.rollback();
-
- if (dhtTx != null)
- dhtTx.rollback();
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- */
- @SuppressWarnings({"unchecked"})
- protected final void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest<K, V> req) {
- assert nodeId != null;
- assert req != null;
-
- if (log.isDebugEnabled())
- log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
-
- GridDhtTxRemote<K, V> dhtTx = ctx.tm().tx(req.version());
- GridNearTxRemote<K, V> nearTx = ctx.tm().nearTx(req.version());
-
- try {
- if (dhtTx == null && !F.isEmpty(req.writes()))
- dhtTx = startRemoteTxForFinish(nodeId, req);
-
- if (dhtTx != null) {
- dhtTx.syncCommit(req.syncCommit());
- dhtTx.syncRollback(req.syncRollback());
- }
-
- // One-phase commit transactions send finish requests to backup nodes.
- if (dhtTx != null && req.onePhaseCommit()) {
- dhtTx.onePhaseCommit(true);
-
- dhtTx.writeVersion(req.writeVersion());
- }
-
- if (nearTx == null && !F.isEmpty(req.nearWrites()) && req.groupLock())
- nearTx = startNearRemoteTxForFinish(nodeId, req);
-
- if (nearTx != null) {
- nearTx.syncCommit(req.syncCommit());
- nearTx.syncRollback(req.syncRollback());
- }
- }
- catch (IgniteTxRollbackException e) {
- if (log.isDebugEnabled())
- log.debug("Received finish request for completed transaction (will ignore) [req=" + req + ", err=" +
- e.getMessage() + ']');
-
- sendReply(nodeId, req);
-
- return;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to start remote DHT and Near transactions (will invalidate transactions) [dhtTx=" +
- dhtTx + ", nearTx=" + nearTx + ']', e);
-
- if (dhtTx != null)
- dhtTx.invalidate(true);
-
- if (nearTx != null)
- nearTx.invalidate(true);
- }
- catch (GridDistributedLockCancelledException ignore) {
- U.warn(log, "Received commit request to cancelled lock (will invalidate transaction) [dhtTx=" +
- dhtTx + ", nearTx=" + nearTx + ']');
-
- if (dhtTx != null)
- dhtTx.invalidate(true);
-
- if (nearTx != null)
- nearTx.invalidate(true);
- }
-
- // Safety - local transaction will finish explicitly.
- if (nearTx != null && nearTx.local())
- nearTx = null;
-
- finish(nodeId, dhtTx, req, req.writes());
-
- if (nearTx != null)
- finish(nodeId, nearTx, req, req.nearWrites());
-
- sendReply(nodeId, req);
- }
-
- /**
- * @param nodeId Node ID.
- * @param tx Transaction.
- * @param req Request.
- * @param writes Writes.
- */
- protected void finish(
- UUID nodeId,
- GridCacheTxRemoteEx<K, V> tx,
- GridDhtTxFinishRequest<K, V> req,
- Collection<GridCacheTxEntry<K, V>> writes) {
- // We don't allow explicit locks for transactions and
- // therefore immediately return if transaction is null.
- // However, we may decide to relax this restriction in
- // future.
- if (tx == null) {
- if (req.commit())
- // Must be some long time duplicate, but we add it anyway.
- ctx.tm().addCommittedTx(req.version(), null);
- else
- ctx.tm().addRolledbackTx(req.version());
-
- if (log.isDebugEnabled())
- log.debug("Received finish request for non-existing transaction (added to completed set) " +
- "[senderNodeId=" + nodeId + ", res=" + req + ']');
-
- return;
- }
- else if (log.isDebugEnabled())
- log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req +
- ", tx=" + tx + ']');
-
- try {
- if (req.commit() || req.isSystemInvalidate()) {
- if (tx.commitVersion(req.commitVersion())) {
- tx.invalidate(req.isInvalidate());
- tx.systemInvalidate(req.isSystemInvalidate());
-
- if (!F.isEmpty(writes)) {
- // In OPTIMISTIC mode, we get the values at PREPARE stage.
- assert tx.concurrency() == PESSIMISTIC;
-
- for (GridCacheTxEntry<K, V> entry : writes) {
- if (log.isDebugEnabled())
- log.debug("Unmarshalled transaction entry from pessimistic transaction [key=" +
- entry.key() + ", value=" + entry.value() + ", tx=" + tx + ']');
-
- if (!tx.setWriteValue(entry))
- U.warn(log, "Received entry to commit that was not present in transaction [entry=" +
- entry + ", tx=" + tx + ']');
- }
- }
-
- // Complete remote candidates.
- tx.doneRemote(req.baseVersion(), null, null, null);
-
- if (tx.pessimistic())
- tx.prepare();
-
- tx.commit();
- }
- }
- else {
- assert tx != null;
-
- tx.doneRemote(req.baseVersion(), null, null, null);
-
- tx.rollback();
- }
- }
- catch (Throwable e) {
- U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e);
-
- // Mark transaction for invalidate.
- tx.invalidate(true);
- tx.systemInvalidate(true);
-
- try {
- tx.commit();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to invalidate transaction: " + tx, ex);
- }
- }
- }
-
- /**
- * Sends tx finish response to remote node, if response is requested.
- *
- * @param nodeId Node id that originated finish request.
- * @param req Request.
- */
- protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req) {
- if (req.replyRequired()) {
- GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
-
- try {
- ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
- }
- catch (Throwable e) {
- // Double-check.
- if (ctx.discovery().node(nodeId) == null) {
- if (log.isDebugEnabled())
- log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
- }
- else
- U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
- }
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- * @param res Response.
- * @return Remote transaction.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId,
- GridDhtTxPrepareRequest<K, V> req,
- GridDhtTxPrepareResponse<K, V> res) throws IgniteCheckedException {
- if (!F.isEmpty(req.writes())) {
- GridDhtTxRemote<K, V> tx = ctx.tm().tx(req.version());
-
- assert F.isEmpty(req.candidatesByKey());
-
- if (tx == null) {
- tx = new GridDhtTxRemote<>(
- ctx,
- req.nearNodeId(),
- req.futureId(),
- nodeId,
- req.threadId(),
- req.topologyVersion(),
- req.version(),
- req.commitVersion(),
- req.system(),
- req.concurrency(),
- req.isolation(),
- req.isInvalidate(),
- req.timeout(),
- req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
- req.groupLockKey(),
- req.nearXidVersion(),
- req.transactionNodes(),
- req.subjectId(),
- req.taskNameHash());
-
- tx = ctx.tm().onCreated(tx);
-
- if (tx == null || !ctx.tm().onStarted(tx)) {
- if (log.isDebugEnabled())
- log.debug("Attempt to start a completed transaction (will ignore): " + tx);
-
- return null;
- }
- }
-
- if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {
- int idx = 0;
-
- for (GridCacheTxEntry<K, V> entry : req.writes()) {
- GridCacheContext<K, V> cacheCtx = entry.context();
-
- tx.addWrite(entry, ctx.deploy().globalLoader());
-
- if (isNearEnabled(cacheCtx) && req.invalidateNearEntry(idx))
- invalidateNearEntry(cacheCtx, entry.key(), req.version());
-
- try {
- if (req.needPreloadKey(idx)) {
- GridCacheEntryEx<K, V> cached = entry.cached();
-
- if (cached == null)
- cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
-
- GridCacheEntryInfo<K, V> info = cached.info();
-
- if (info != null && !info.isNew() && !info.isDeleted())
- res.addPreloadEntry(info);
- }
- }
- catch (GridDhtInvalidPartitionException e) {
- tx.addInvalidPartition(cacheCtx, e.partition());
-
- tx.clearEntry(entry.txKey());
- }
-
- idx++;
- }
- }
-
- // Prepare prior to reordering, so the pending locks added
- // in prepare phase will get properly ordered as well.
- tx.prepare();
-
- if (req.last())
- tx.state(PREPARED);
-
- res.invalidPartitions(tx.invalidPartitions());
-
- if (tx.empty() && req.last()) {
- tx.rollback();
-
- return null;
- }
-
- return tx;
- }
-
- return null;
- }
-
- /**
- * @param key Key
- * @param ver Version.
- * @throws IgniteCheckedException If invalidate failed.
- */
- private void invalidateNearEntry(GridCacheContext<K, V> cacheCtx, K key, GridCacheVersion ver)
- throws IgniteCheckedException {
- GridNearCacheAdapter<K, V> near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
-
- GridCacheEntryEx<K, V> nearEntry = near.peekEx(key);
-
- if (nearEntry != null)
- nearEntry.invalidate(null, ver);
- }
-
- /**
- * Called while processing dht tx prepare request.
- *
- * @param ldr Loader.
- * @param nodeId Sender node ID.
- * @param req Request.
- * @return Remote transaction.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable public GridNearTxRemote<K, V> startNearRemoteTx(ClassLoader ldr, UUID nodeId,
- GridDhtTxPrepareRequest<K, V> req) throws IgniteCheckedException {
- assert F.isEmpty(req.candidatesByKey());
-
- if (!F.isEmpty(req.nearWrites())) {
- GridNearTxRemote<K, V> tx = ctx.tm().nearTx(req.version());
-
- if (tx == null) {
- tx = new GridNearTxRemote<>(
- ctx,
- ldr,
- nodeId,
- req.nearNodeId(),
- req.threadId(),
- req.version(),
- req.commitVersion(),
- req.system(),
- req.concurrency(),
- req.isolation(),
- req.isInvalidate(),
- req.timeout(),
- req.nearWrites(),
- req.txSize(),
- req.groupLockKey(),
- req.subjectId(),
- req.taskNameHash()
- );
-
- if (!tx.empty()) {
- tx = ctx.tm().onCreated(tx);
-
- if (tx == null || !ctx.tm().onStarted(tx))
- throw new IgniteTxRollbackException("Attempt to start a completed transaction: " + tx);
- }
- }
- else
- tx.addEntries(ldr, req.nearWrites());
-
- tx.ownedVersions(req.owned());
-
- // Prepare prior to reordering, so the pending locks added
- // in prepare phase will get properly ordered as well.
- tx.prepare();
-
- return tx;
- }
-
- return null;
- }
-
- /**
- * @param nodeId Primary node ID.
- * @param req Request.
- * @return Remote transaction.
- * @throws IgniteCheckedException If failed.
- * @throws GridDistributedLockCancelledException If lock has been cancelled.
- */
- @SuppressWarnings({"RedundantTypeArguments"})
- @Nullable GridDhtTxRemote<K, V> startRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
- throws IgniteCheckedException, GridDistributedLockCancelledException {
-
- GridDhtTxRemote<K, V> tx = null;
-
- boolean marked = false;
-
- for (GridCacheTxEntry<K, V> txEntry : req.writes()) {
- GridDistributedCacheEntry<K, V> entry = null;
-
- GridCacheContext<K, V> cacheCtx = txEntry.context();
-
- while (true) {
- try {
- int part = cacheCtx.affinity().partition(txEntry.key());
-
- GridDhtLocalPartition<K, V> locPart = cacheCtx.topology().localPartition(part,
- req.topologyVersion(), false);
-
- // Handle implicit locks for pessimistic transactions.
- if (tx == null)
- tx = ctx.tm().tx(req.version());
-
- if (locPart == null || !locPart.reserve()) {
- if (log.isDebugEnabled())
- log.debug("Local partition for given key is already evicted (will remove from tx) " +
- "[key=" + txEntry.key() + ", part=" + part + ", locPart=" + locPart + ']');
-
- if (tx != null)
- tx.clearEntry(txEntry.txKey());
-
- break;
- }
-
- try {
- entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key(),
- req.topologyVersion());
-
- if (tx == null) {
- tx = new GridDhtTxRemote<>(
- ctx,
- req.nearNodeId(),
- req.futureId(),
- nodeId,
- // We can pass null as nearXidVersion as transaction will be committed right away.
- null,
- req.threadId(),
- req.topologyVersion(),
- req.version(),
- /*commitVer*/null,
- req.system(),
- PESSIMISTIC,
- req.isolation(),
- req.isInvalidate(),
- 0,
- req.txSize(),
- req.groupLockKey(),
- req.subjectId(),
- req.taskNameHash());
-
- tx = ctx.tm().onCreated(tx);
-
- if (tx == null || !ctx.tm().onStarted(tx))
- throw new IgniteTxRollbackException("Failed to acquire lock " +
- "(transaction has been completed): " + req.version());
- }
-
- tx.addWrite(cacheCtx, txEntry.op(), txEntry.txKey(), txEntry.keyBytes(), txEntry.value(),
- txEntry.valueBytes(), txEntry.transformClosures(), txEntry.drVersion());
-
- if (!marked) {
- if (tx.markFinalizing(USER_FINISH))
- marked = true;
- else {
- tx.clearEntry(txEntry.txKey());
-
- return null;
- }
- }
-
- // Add remote candidate before reordering.
- if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
- entry.addRemote(
- req.nearNodeId(),
- nodeId,
- req.threadId(),
- req.version(),
- 0,
- /*tx*/true,
- tx.implicitSingle(),
- null
- );
-
- // Double-check in case if sender node left the grid.
- if (ctx.discovery().node(req.nearNodeId()) == null) {
- if (log.isDebugEnabled())
- log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
-
- tx.rollback();
-
- return null;
- }
-
- // Entry is legit.
- break;
- }
- finally {
- locPart.release();
- }
- }
- catch (GridCacheEntryRemovedException ignored) {
- assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
- entry;
-
- if (log.isDebugEnabled())
- log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
-
- tx.clearEntry(txEntry.txKey());
-
- if (log.isDebugEnabled())
- log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
- entry + ", tx=" + tx + ']');
- }
- catch (GridDhtInvalidPartitionException p) {
- if (log.isDebugEnabled())
- log.debug("Received invalid partition (will clear entry from tx) [part=" + p + ", req=" +
- req + ", txEntry=" + txEntry + ']');
-
- if (tx != null)
- tx.clearEntry(txEntry.txKey());
-
- break;
- }
- }
- }
-
- if (tx != null && tx.empty()) {
- tx.rollback();
-
- return null;
- }
-
- return tx;
- }
-
- /**
- * @param nodeId Primary node ID.
- * @param req Request.
- * @return Remote transaction.
- * @throws IgniteCheckedException If failed.
- * @throws GridDistributedLockCancelledException If lock has been cancelled.
- */
- @SuppressWarnings({"RedundantTypeArguments"})
- @Nullable public GridNearTxRemote<K, V> startNearRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
- throws IgniteCheckedException, GridDistributedLockCancelledException {
- assert req.groupLock();
-
- GridNearTxRemote<K, V> tx = null;
-
- ClassLoader ldr = ctx.deploy().globalLoader();
-
- if (ldr != null) {
- boolean marked = false;
-
- for (GridCacheTxEntry<K, V> txEntry : req.nearWrites()) {
- GridDistributedCacheEntry<K, V> entry = null;
-
- GridCacheContext<K, V> cacheCtx = txEntry.context();
-
- while (true) {
- try {
- entry = cacheCtx.near().peekExx(txEntry.key());
-
- if (entry != null) {
- entry.keyBytes(txEntry.keyBytes());
-
- // Handle implicit locks for pessimistic transactions.
- if (tx == null)
- tx = ctx.tm().nearTx(req.version());
-
- if (tx == null) {
- tx = new GridNearTxRemote<>(
- ctx,
- nodeId,
- req.nearNodeId(),
- // We can pass null as nearXidVer as transaction will be committed right away.
- null,
- req.threadId(),
- req.version(),
- null,
- req.system(),
- PESSIMISTIC,
- req.isolation(),
- req.isInvalidate(),
- 0,
- req.txSize(),
- req.groupLockKey(),
- req.subjectId(),
- req.taskNameHash());
-
- tx = ctx.tm().onCreated(tx);
-
- if (tx == null || !ctx.tm().onStarted(tx))
- throw new IgniteTxRollbackException("Failed to acquire lock " +
- "(transaction has been completed): " + req.version());
-
- if (!marked)
- marked = tx.markFinalizing(USER_FINISH);
-
- if (!marked)
- return null;
- }
-
- if (tx.local())
- return null;
-
- if (!marked)
- marked = tx.markFinalizing(USER_FINISH);
-
- if (marked)
- tx.addEntry(cacheCtx, txEntry.txKey(), txEntry.keyBytes(), txEntry.op(), txEntry.value(),
- txEntry.valueBytes(), txEntry.drVersion());
- else
- return null;
-
- if (req.groupLock()) {
- tx.markGroupLock();
-
- if (!txEntry.groupLockEntry())
- tx.groupLockKey(txEntry.txKey());
- }
-
- // Add remote candidate before reordering.
- if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
- entry.addRemote(
- req.nearNodeId(),
- nodeId,
- req.threadId(),
- req.version(),
- 0,
- /*tx*/true,
- tx.implicitSingle(),
- null
- );
- }
-
- // Double-check in case if sender node left the grid.
- if (ctx.discovery().node(req.nearNodeId()) == null) {
- if (log.isDebugEnabled())
- log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
-
- if (tx != null)
- tx.rollback();
-
- return null;
- }
-
- // Entry is legit.
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
- entry;
-
- if (log.isDebugEnabled())
- log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
-
- if (tx != null) {
- tx.clearEntry(txEntry.txKey());
-
- if (log.isDebugEnabled())
- log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
- entry + ", tx=" + tx + ']');
- }
-
- // Will retry in while loop.
- }
- }
- }
- }
- else {
- String err = "Failed to acquire deployment class loader for message: " + req;
-
- U.warn(log, err);
-
- throw new IgniteCheckedException(err);
- }
-
- return tx;
- }
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- */
- protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest<K, V> req) {
- if (log.isDebugEnabled())
- log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
-
- boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
-
- GridCacheOptimisticCheckPreparedTxResponse<K, V> res =
- new GridCacheOptimisticCheckPreparedTxResponse<>(req.version(), req.futureId(), req.miniId(), prepared);
-
- try {
- if (log.isDebugEnabled())
- log.debug("Sending check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
-
- ctx.io().send(nodeId, res);
- }
- catch (ClusterTopologyException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to send check prepared transaction response (did node leave grid?) [nodeId=" +
- nodeId + ", res=" + res + ']');
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheOptimisticCheckPreparedTxResponse<K, V> res) {
- if (log.isDebugEnabled())
- log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']');
-
- GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = (GridCacheOptimisticCheckPreparedTxFuture<K, V>)ctx.mvcc().
- <Boolean>future(res.version(), res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Received response for unknown future (will ignore): " + res);
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
- * @param nodeId Node ID.
- * @param req Request.
- */
- protected void processCheckCommittedTxRequest(final UUID nodeId,
- final GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
- if (log.isDebugEnabled())
- log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']');
-
- IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req);
-
- infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() {
- @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) {
- GridCacheCommittedTxInfo<K, V> info = null;
-
- try {
- info = infoFut.get();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to obtain committed info for transaction (will rollback): " + req, e);
- }
-
- GridCachePessimisticCheckCommittedTxResponse<K, V>
- res = new GridCachePessimisticCheckCommittedTxResponse<>(
- req.version(), req.futureId(), req.miniId(), info);
-
- if (log.isDebugEnabled())
- log.debug("Finished waiting for tx committed info [req=" + req + ", res=" + res + ']');
-
- sendCheckCommittedResponse(nodeId, res); }
- });
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- protected void processCheckCommittedTxResponse(UUID nodeId,
- GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
- if (log.isDebugEnabled())
- log.debug("Processing check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
-
- GridCachePessimisticCheckCommittedTxFuture<K, V> fut =
- (GridCachePessimisticCheckCommittedTxFuture<K, V>)ctx.mvcc().<GridCacheCommittedTxInfo<K, V>>future(
- res.version(), res.futureId());
-
- if (fut == null) {
- if (log.isDebugEnabled())
- log.debug("Received response for unknown future (will ignore): " + res);
-
- return;
- }
-
- fut.onResult(nodeId, res);
- }
-
- /**
- * Sends check committed response to remote node.
- *
- * @param nodeId Node ID to send to.
- * @param res Reponse to send.
- */
- private void sendCheckCommittedResponse(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
-
- ctx.io().send(nodeId, res);
- }
- catch (ClusterTopologyException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" +
- nodeId + ", res=" + res + ']');
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java
deleted file mode 100644
index d804023..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxKey.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.gridgain.grid.util.tostring.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Cache transaction key. This wrapper is needed because same keys may be enlisted in the same transaction
- * for multiple caches.
- */
-public class GridCacheTxKey<K> implements Externalizable {
- /** Key. */
- @GridToStringInclude
- private K key;
-
- /** Cache ID. */
- private int cacheId;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public GridCacheTxKey() {
- // No-op.
- }
-
- /**
- * @param key User key.
- * @param cacheId Cache ID.
- */
- public GridCacheTxKey(K key, int cacheId) {
- this.key = key;
- this.cacheId = cacheId;
- }
-
- /**
- * @return User key.
- */
- public K key() {
- return key;
- }
-
- /**
- * @return Cache ID.
- */
- public int cacheId() {
- return cacheId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (!(o instanceof GridCacheTxKey))
- return false;
-
- GridCacheTxKey that = (GridCacheTxKey)o;
-
- return cacheId == that.cacheId && key.equals(that.key);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = key.hashCode();
-
- res = 31 * res + cacheId;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(cacheId);
- out.writeObject(key);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheId = in.readInt();
- key = (K)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheTxKey.class, this);
- }
-}