You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/22 11:42:41 UTC
[39/50] [abbrv] incubator-ignite git commit: GG-9141 - Renaming.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
deleted file mode 100644
index f81cdec..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
+++ /dev/null
@@ -1,1523 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.future.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
-import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
-import static org.apache.ignite.transactions.IgniteTxIsolation.*;
-import static org.apache.ignite.transactions.IgniteTxState.*;
-import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
-
-/**
- * Managed transaction adapter.
- */
-public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
- implements GridCacheTxEx<K, V>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Static logger to avoid re-creation. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Transaction ID. */
- @GridToStringInclude
- protected GridCacheVersion xidVer;
-
- /** Entries write version. */
- @GridToStringInclude
- protected GridCacheVersion writeVer;
-
- /** Implicit flag. */
- @GridToStringInclude
- protected boolean implicit;
-
- /** Implicit with one key flag. */
- @GridToStringInclude
- protected boolean implicitSingle;
-
- /** Local flag. */
- @GridToStringInclude
- protected boolean loc;
-
- /** Thread ID. */
- @GridToStringInclude
- protected long threadId;
-
- /** Transaction start time. */
- @GridToStringInclude
- protected long startTime = U.currentTimeMillis();
-
- /** Node ID. */
- @GridToStringInclude
- protected UUID nodeId;
-
- /** Transaction counter value at the start of transaction. */
- @GridToStringInclude
- protected GridCacheVersion startVer;
-
- /** Cache registry. */
- @GridToStringExclude
- protected GridCacheSharedContext<K, V> cctx;
-
- /**
- * End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)
- * assigned to this transaction at the end of write phase.
- */
- @GridToStringInclude
- protected GridCacheVersion endVer;
-
- /** Isolation. */
- @GridToStringInclude
- protected IgniteTxIsolation isolation = READ_COMMITTED;
-
- /** Concurrency. */
- @GridToStringInclude
- protected IgniteTxConcurrency concurrency = PESSIMISTIC;
-
- /** Transaction timeout. */
- @GridToStringInclude
- protected long timeout;
-
- /** Invalidate flag. */
- protected volatile boolean invalidate;
-
- /** Invalidation flag for system invalidations (not user-based ones). */
- private boolean sysInvalidate;
-
- /** Internal flag. */
- protected boolean internal;
-
- /** System transaction flag. */
- private boolean sys;
-
- /** */
- protected boolean onePhaseCommit;
-
- /** */
- protected boolean syncCommit;
-
- /** */
- protected boolean syncRollback;
-
- /** If this transaction contains transform entries. */
- protected boolean transform;
-
- /** Commit version. */
- private AtomicReference<GridCacheVersion> commitVer = new AtomicReference<>(null);
-
- /** Done marker. */
- protected final AtomicBoolean isDone = new AtomicBoolean(false);
-
- /** */
- private AtomicReference<FinalizationStatus> finalizing = new AtomicReference<>(NONE);
-
- /** Preparing flag. */
- private AtomicBoolean preparing = new AtomicBoolean();
-
- /** */
- private Set<Integer> invalidParts = new GridLeanSet<>();
-
- /** Recover writes. */
- private Collection<GridCacheTxEntry<K, V>> recoveryWrites;
-
- /**
- * Transaction state. Note that state is not protected, as we want to
- * always use {@link #state()} and {@link #state(IgniteTxState)}
- * methods.
- */
- @GridToStringInclude
- private volatile IgniteTxState state = ACTIVE;
-
- /** Timed out flag. */
- private volatile boolean timedOut;
-
- /** */
- protected int txSize;
-
- /** Group lock key, if any. */
- protected GridCacheTxKey grpLockKey;
-
- /** */
- @GridToStringExclude
- private AtomicReference<GridFutureAdapter<IgniteTx>> finFut = new AtomicReference<>();
-
- /** Topology version. */
- private AtomicLong topVer = new AtomicLong(-1);
-
- /** Mutex. */
- private final Lock lock = new ReentrantLock();
-
- /** Lock condition. */
- private final Condition cond = lock.newCondition();
-
- /** Subject ID initiated this transaction. */
- protected UUID subjId;
-
- /** Task name hash code. */
- protected int taskNameHash;
-
- /** Task name. */
- protected String taskName;
-
- /** Store used flag. */
- protected boolean storeEnabled = true;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- protected GridCacheTxAdapter() {
- // No-op.
- }
-
- /**
- * @param cctx Cache registry.
- * @param xidVer Transaction ID.
- * @param implicit Implicit flag.
- * @param implicitSingle Implicit with one key flag.
- * @param loc Local flag.
- * @param sys System transaction flag.
- * @param concurrency Concurrency.
- * @param isolation Isolation.
- * @param timeout Timeout.
- * @param txSize Transaction size.
- * @param grpLockKey Group lock key if this is group-lock transaction.
- */
- protected GridCacheTxAdapter(
- GridCacheSharedContext<K, V> cctx,
- GridCacheVersion xidVer,
- boolean implicit,
- boolean implicitSingle,
- boolean loc,
- boolean sys,
- IgniteTxConcurrency concurrency,
- IgniteTxIsolation isolation,
- long timeout,
- boolean invalidate,
- boolean storeEnabled,
- int txSize,
- @Nullable GridCacheTxKey grpLockKey,
- @Nullable UUID subjId,
- int taskNameHash
- ) {
- assert xidVer != null;
- assert cctx != null;
-
- this.cctx = cctx;
- this.xidVer = xidVer;
- this.implicit = implicit;
- this.implicitSingle = implicitSingle;
- this.loc = loc;
- this.sys = sys;
- this.concurrency = concurrency;
- this.isolation = isolation;
- this.timeout = timeout;
- this.invalidate = invalidate;
- this.storeEnabled = storeEnabled;
- this.txSize = txSize;
- this.grpLockKey = grpLockKey;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
-
- startVer = cctx.versions().last();
-
- nodeId = cctx.discovery().localNode().id();
-
- threadId = Thread.currentThread().getId();
-
- log = U.logger(cctx.kernalContext(), logRef, this);
- }
-
- /**
- * @param cctx Cache registry.
- * @param nodeId Node ID.
- * @param xidVer Transaction ID.
- * @param startVer Start version mark.
- * @param threadId Thread ID.
- * @param sys System transaction flag.
- * @param concurrency Concurrency.
- * @param isolation Isolation.
- * @param timeout Timeout.
- * @param txSize Transaction size.
- * @param grpLockKey Group lock key if this is group-lock transaction.
- */
- protected GridCacheTxAdapter(
- GridCacheSharedContext<K, V> cctx,
- UUID nodeId,
- GridCacheVersion xidVer,
- GridCacheVersion startVer,
- long threadId,
- boolean sys,
- IgniteTxConcurrency concurrency,
- IgniteTxIsolation isolation,
- long timeout,
- int txSize,
- @Nullable GridCacheTxKey grpLockKey,
- @Nullable UUID subjId,
- int taskNameHash
- ) {
- this.cctx = cctx;
- this.nodeId = nodeId;
- this.threadId = threadId;
- this.xidVer = xidVer;
- this.startVer = startVer;
- this.sys = sys;
- this.concurrency = concurrency;
- this.isolation = isolation;
- this.timeout = timeout;
- this.txSize = txSize;
- this.grpLockKey = grpLockKey;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
-
- implicit = false;
- implicitSingle = false;
- loc = false;
-
- log = U.logger(cctx.kernalContext(), logRef, this);
- }
-
- /**
- * Acquires lock.
- */
- @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
- protected final void lock() {
- lock.lock();
- }
-
- /**
- * Releases lock.
- */
- protected final void unlock() {
- lock.unlock();
- }
-
- /**
- * Signals all waiters.
- */
- protected final void signalAll() {
- cond.signalAll();
- }
-
- /**
- * Waits for signal.
- *
- * @throws InterruptedException If interrupted.
- */
- protected final void awaitSignal() throws InterruptedException {
- cond.await();
- }
-
- /**
- * Checks whether near cache should be updated.
- *
- * @return Flag indicating whether near cache should be updated.
- */
- protected boolean updateNearCache(GridCacheContext<K, V> cacheCtx, K key, long topVer) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<GridCacheTxEntry<K, V>> optimisticLockEntries() {
- assert optimistic();
-
- if (!groupLock())
- return writeEntries();
- else {
- if (!F.isEmpty(invalidParts)) {
- assert invalidParts.size() == 1 : "Only one partition expected for group lock transaction " +
- "[tx=" + this + ", invalidParts=" + invalidParts + ']';
- assert groupLockEntry() == null : "Group lock key should be rejected " +
- "[tx=" + this + ", groupLockEntry=" + groupLockEntry() + ']';
- assert F.isEmpty(writeMap()) : "All entries should be rejected for group lock transaction " +
- "[tx=" + this + ", writes=" + writeMap() + ']';
-
- return Collections.emptyList();
- }
-
- GridCacheTxEntry<K, V> grpLockEntry = groupLockEntry();
-
- assert grpLockEntry != null || (near() && !local()):
- "Group lock entry was not enlisted into transaction [tx=" + this +
- ", grpLockKey=" + groupLockKey() + ']';
-
- return grpLockEntry == null ?
- Collections.<GridCacheTxEntry<K,V>>emptyList() :
- Collections.singletonList(grpLockEntry);
- }
- }
-
- /**
- * @param recoveryWrites Recover write entries.
- */
- public void recoveryWrites(Collection<GridCacheTxEntry<K, V>> recoveryWrites) {
- this.recoveryWrites = recoveryWrites;
- }
-
- /**
- * @return Recover write entries.
- */
- @Override public Collection<GridCacheTxEntry<K, V>> recoveryWrites() {
- return recoveryWrites;
- }
-
- /** {@inheritDoc} */
- @Override public boolean storeEnabled() {
- return storeEnabled;
- }
-
- /**
- * @param storeEnabled Store enabled flag.
- */
- public void storeEnabled(boolean storeEnabled) {
- this.storeEnabled = storeEnabled;
- }
-
- /** {@inheritDoc} */
- @Override public boolean system() {
- return sys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean storeUsed() {
- return storeEnabled() && store() != null;
- }
-
- /**
- * Store manager for current transaction.
- *
- * @return Store manager.
- */
- protected GridCacheStoreManager<K, V> store() {
- if (!activeCacheIds().isEmpty()) {
- int cacheId = F.first(activeCacheIds());
-
- GridCacheStoreManager<K, V> store = cctx.cacheContext(cacheId).store();
-
- return store.configured() ? store : null;
- }
-
- return null;
- }
-
- /**
- * This method uses unchecked assignment to cast group lock key entry to transaction generic signature.
- *
- * @return Group lock tx entry.
- */
- @SuppressWarnings("unchecked")
- public GridCacheTxEntry<K, V> groupLockEntry() {
- return ((GridCacheTxAdapter)this).entry(groupLockKey());
- }
-
- /** {@inheritDoc} */
- @Override public UUID otherNodeId() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public UUID subjectId() {
- if (subjId != null)
- return subjId;
-
- return originatingNodeId();
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public long topologyVersion() {
- long res = topVer.get();
-
- if (res == -1)
- return cctx.exchange().topologyVersion();
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public long topologyVersion(long topVer) {
- this.topVer.compareAndSet(-1, topVer);
-
- return this.topVer.get();
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasTransforms() {
- return transform;
- }
-
- /** {@inheritDoc} */
- @Override public boolean markPreparing() {
- return preparing.compareAndSet(false, true);
- }
-
- /**
- * @return {@code True} if marked.
- */
- @Override public boolean markFinalizing(FinalizationStatus status) {
- boolean res;
-
- switch (status) {
- case USER_FINISH:
- res = finalizing.compareAndSet(NONE, USER_FINISH);
-
- break;
-
- case RECOVERY_WAIT:
- finalizing.compareAndSet(NONE, RECOVERY_WAIT);
-
- FinalizationStatus cur = finalizing.get();
-
- res = cur == RECOVERY_WAIT || cur == RECOVERY_FINISH;
-
- break;
-
- case RECOVERY_FINISH:
- FinalizationStatus old = finalizing.get();
-
- res = old != USER_FINISH && finalizing.compareAndSet(old, status);
-
- break;
-
- default:
- throw new IllegalArgumentException("Cannot set finalization status: " + status);
-
- }
-
- if (res) {
- if (log.isDebugEnabled())
- log.debug("Marked transaction as finalized: " + this);
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Transaction was not marked finalized: " + this);
- }
-
- return res;
- }
-
- /**
- * @return Finalization status.
- */
- protected FinalizationStatus finalizationStatus() {
- return finalizing.get();
- }
-
- /**
- * @return {@code True} if transaction has at least one key enlisted.
- */
- public abstract boolean isStarted();
-
- /** {@inheritDoc} */
- @Override public boolean groupLock() {
- return grpLockKey != null;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheTxKey groupLockKey() {
- return grpLockKey;
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return txSize;
- }
-
- /**
- * @return Logger.
- */
- protected IgniteLogger log() {
- return log;
- }
-
- /** {@inheritDoc} */
- @Override public boolean near() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean implicit() {
- return implicit;
- }
-
- /** {@inheritDoc} */
- @Override public boolean implicitSingle() {
- return implicitSingle;
- }
-
- /** {@inheritDoc} */
- @Override public boolean local() {
- return loc;
- }
-
- /** {@inheritDoc} */
- @Override public final boolean user() {
- return !implicit() && local() && !dht() && !internal();
- }
-
- /** {@inheritDoc} */
- @Override public boolean dht() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean colocated() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean replicated() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean enforceSerializable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean syncCommit() {
- return syncCommit;
- }
-
- /** {@inheritDoc} */
- @Override public boolean syncRollback() {
- return syncRollback;
- }
-
- /**
- * @param syncCommit Synchronous commit flag.
- */
- public void syncCommit(boolean syncCommit) {
- this.syncCommit = syncCommit;
- }
-
- /**
- * @param syncRollback Synchronous rollback flag.
- */
- public void syncRollback(boolean syncRollback) {
- this.syncRollback = syncRollback;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid xid() {
- return xidVer.asGridUuid();
- }
-
- /** {@inheritDoc} */
- @Override public Set<Integer> invalidPartitions() {
- return invalidParts;
- }
-
- /** {@inheritDoc} */
- @Override public void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int part) {
- invalidParts.add(part);
-
- if (log.isDebugEnabled())
- log.debug("Added invalid partition for transaction [part=" + part + ", tx=" + this + ']');
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion ownedVersion(GridCacheTxKey<K> key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long startTime() {
- return startTime;
- }
-
- /**
- * Gets remaining allowed transaction time.
- *
- * @return Remaining transaction time.
- */
- @Override public long remainingTime() {
- if (timeout() <= 0)
- return -1;
-
- long timeLeft = timeout() - (U.currentTimeMillis() - startTime());
-
- if (timeLeft < 0)
- return 0;
-
- return timeLeft;
- }
-
- /**
- * @return Lock timeout.
- */
- protected long lockTimeout() {
- long timeout = remainingTime();
-
- return timeout < 0 ? 0 : timeout == 0 ? -1 : timeout;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion xidVersion() {
- return xidVer;
- }
-
- /** {@inheritDoc} */
- @Override public long threadId() {
- return threadId;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxIsolation isolation() {
- return isolation;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxConcurrency concurrency() {
- return concurrency;
- }
-
- /** {@inheritDoc} */
- @Override public long timeout() {
- return timeout;
- }
-
- /** {@inheritDoc} */
- @Override public long timeout(long timeout) {
- if (isStarted())
- throw new IllegalStateException("Cannot change timeout after transaction has started: " + this);
-
- long old = this.timeout;
-
- this.timeout = timeout;
-
- return old;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean ownsLock(GridCacheEntryEx<K, V> entry) throws GridCacheEntryRemovedException {
- GridCacheContext<K, V> cacheCtx = entry.context();
-
- GridCacheTxEntry<K, V> txEntry = entry(entry.txKey());
-
- GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
-
- assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " +
- "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']';
-
- return local() && !cacheCtx.isDht() ?
- entry.lockedByThread(threadId()) || (explicit != null && entry.lockedBy(explicit)) :
- // If candidate is not there, then lock was explicit.
- // Otherwise, check if entry is owned by version.
- !entry.hasLockCandidate(xidVersion()) || entry.lockedBy(xidVersion());
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean ownsLockUnsafe(GridCacheEntryEx<K, V> entry) {
- GridCacheContext<K, V> cacheCtx = entry.context();
-
- GridCacheTxEntry<K, V> txEntry = entry(entry.txKey());
-
- GridCacheVersion explicit = txEntry == null ? null : txEntry.explicitVersion();
-
- assert !txEntry.groupLockEntry() || groupLock() : "Can not have group-locked tx entries in " +
- "non-group-lock transactions [txEntry=" + txEntry + ", tx=" + this + ']';
-
- return local() && !cacheCtx.isDht() ?
- entry.lockedByThreadUnsafe(threadId()) || (explicit != null && entry.lockedByUnsafe(explicit)) :
- // If candidate is not there, then lock was explicit.
- // Otherwise, check if entry is owned by version.
- !entry.hasLockCandidateUnsafe(xidVersion()) || entry.lockedByUnsafe(xidVersion());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxState state() {
- return state;
- }
-
- /** {@inheritDoc} */
- @Override public boolean setRollbackOnly() {
- return state(MARKED_ROLLBACK);
- }
-
- /**
- * @return {@code True} if rollback only flag is set.
- */
- @Override public boolean isRollbackOnly() {
- return state == MARKED_ROLLBACK || state == ROLLING_BACK || state == ROLLED_BACK;
- }
-
- /** {@inheritDoc} */
- @Override public boolean done() {
- return isDone.get();
- }
-
- /**
- * @return Commit version.
- */
- @Override public GridCacheVersion commitVersion() {
- initCommitVersion();
-
- return commitVer.get();
- }
-
- /**
- * @param commitVer Commit version.
- * @return {@code True} if set to not null value.
- */
- @Override public boolean commitVersion(GridCacheVersion commitVer) {
- return commitVer != null && this.commitVer.compareAndSet(null, commitVer);
- }
-
- /**
- *
- */
- public void initCommitVersion() {
- if (commitVer.get() == null)
- commitVer.compareAndSet(null, xidVer);
- }
-
- /**
- *
- */
- @Override public void close() throws IgniteCheckedException {
- IgniteTxState state = state();
-
- if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED)
- rollback();
-
- awaitCompletion();
- }
-
- /** {@inheritDoc} */
- @Override public boolean needsCompletedVersions() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed,
- Collection<GridCacheVersion> txs) {
- /* No-op. */
- }
-
- /**
- * Awaits transaction completion.
- *
- * @throws IgniteCheckedException If waiting failed.
- */
- protected void awaitCompletion() throws IgniteCheckedException {
- lock();
-
- try {
- while (!done())
- awaitSignal();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- if (!done())
- throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + this, e);
- }
- finally {
- unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean internal() {
- return internal;
- }
-
- /**
- * @param key Key.
- * @return {@code True} if key is internal.
- */
- protected boolean checkInternal(GridCacheTxKey<K> key) {
- if (key.key() instanceof GridCacheInternal) {
- internal = true;
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @param onePhaseCommit {@code True} if transaction commit should be performed in short-path way.
- */
- public void onePhaseCommit(boolean onePhaseCommit) {
- this.onePhaseCommit = onePhaseCommit;
- }
-
- /**
- * @return Fast commit flag.
- */
- @Override public boolean onePhaseCommit() {
- return onePhaseCommit;
- }
-
- /** {@inheritDoc} */
- @Override public boolean optimistic() {
- return concurrency == OPTIMISTIC;
- }
-
- /** {@inheritDoc} */
- @Override public boolean pessimistic() {
- return concurrency == PESSIMISTIC;
- }
-
- /** {@inheritDoc} */
- @Override public boolean serializable() {
- return isolation == SERIALIZABLE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean repeatableRead() {
- return isolation == REPEATABLE_READ;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readCommitted() {
- return isolation == READ_COMMITTED;
- }
-
- /** {@inheritDoc} */
- @Override public boolean state(IgniteTxState state) {
- return state(state, false);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
- @Override public IgniteFuture<IgniteTx> finishFuture() {
- GridFutureAdapter<IgniteTx> fut = finFut.get();
-
- if (fut == null) {
- fut = new GridFutureAdapter<IgniteTx>(cctx.kernalContext()) {
- @Override public String toString() {
- return S.toString(GridFutureAdapter.class, this, "tx", GridCacheTxAdapter.this);
- }
- };
-
- if (!finFut.compareAndSet(null, fut))
- fut = finFut.get();
- }
-
- assert fut != null;
-
- if (isDone.get())
- fut.onDone(this);
-
- return fut;
- }
-
- /**
- *
- * @param state State to set.
- * @param timedOut Timeout flag.
- * @return {@code True} if state changed.
- */
- @SuppressWarnings({"TooBroadScope"})
- private boolean state(IgniteTxState state, boolean timedOut) {
- boolean valid = false;
-
- IgniteTxState prev;
-
- boolean notify = false;
-
- lock();
-
- try {
- prev = this.state;
-
- switch (state) {
- case ACTIVE: {
- valid = false;
-
- break;
- } // Active is initial state and cannot be transitioned to.
- case PREPARING: {
- valid = prev == ACTIVE;
-
- break;
- }
- case PREPARED: {
- valid = prev == PREPARING;
-
- break;
- }
- case COMMITTING: {
- valid = prev == PREPARED;
-
- break;
- }
-
- case UNKNOWN: {
- if (isDone.compareAndSet(false, true))
- notify = true;
-
- valid = prev == ROLLING_BACK || prev == COMMITTING;
-
- break;
- }
-
- case COMMITTED: {
- if (isDone.compareAndSet(false, true))
- notify = true;
-
- valid = prev == COMMITTING;
-
- break;
- }
-
- case ROLLED_BACK: {
- if (isDone.compareAndSet(false, true))
- notify = true;
-
- valid = prev == ROLLING_BACK;
-
- break;
- }
-
- case MARKED_ROLLBACK: {
- valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING;
-
- break;
- }
-
- case ROLLING_BACK: {
- valid =
- prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING ||
- prev == PREPARED || (prev == COMMITTING && local() && !dht());
-
- break;
- }
- }
-
- if (valid) {
- this.state = state;
- this.timedOut = timedOut;
-
- if (log.isDebugEnabled())
- log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
-
- // Notify of state change.
- signalAll();
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Invalid transaction state transition [invalid=" + state + ", cur=" + this.state +
- ", tx=" + this + ']');
- }
- }
- finally {
- unlock();
- }
-
- if (notify) {
- GridFutureAdapter<IgniteTx> fut = finFut.get();
-
- if (fut != null)
- fut.onDone(this);
- }
-
- if (valid) {
- // Seal transactions maps.
- if (state != ACTIVE)
- seal();
-
- cctx.tm().onTxStateChange(prev, state, this);
- }
-
- return valid;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion startVersion() {
- return startVer;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion endVersion() {
- return endVer;
- }
-
- /** {@inheritDoc} */
- @Override public void endVersion(GridCacheVersion endVer) {
- this.endVer = endVer;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion writeVersion() {
- return writeVer == null ? commitVersion() : writeVer;
- }
-
- /** {@inheritDoc} */
- @Override public void writeVersion(GridCacheVersion writeVer) {
- this.writeVer = writeVer;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid timeoutId() {
- return xidVer.asGridUuid();
- }
-
- /** {@inheritDoc} */
- @Override public long endTime() {
- long endTime = timeout == 0 ? Long.MAX_VALUE : startTime + timeout;
-
- return endTime > 0 ? endTime : endTime < 0 ? Long.MAX_VALUE : endTime;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- state(MARKED_ROLLBACK, true);
- }
-
- /** {@inheritDoc} */
- @Override public boolean timedOut() {
- return timedOut;
- }
-
- /** {@inheritDoc} */
- @Override public void invalidate(boolean invalidate) {
- if (isStarted() && !dht())
- throw new IllegalStateException("Cannot change invalidation flag after transaction has started: " + this);
-
- this.invalidate = invalidate;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isInvalidate() {
- return invalidate;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isSystemInvalidate() {
- return sysInvalidate;
- }
-
- /** {@inheritDoc} */
- @Override public void systemInvalidate(boolean sysInvalidate) {
- this.sysInvalidate = sysInvalidate;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<UUID, Collection<UUID>> transactionNodes() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public GridCacheVersion nearXidVersion() {
- return null;
- }
-
- /**
- * @param txEntry Entry to process.
- * @param metrics {@code True} if metrics should be updated.
- * @return Tuple containing transformation results.
- * @throws IgniteCheckedException If failed to get previous value for transform.
- * @throws GridCacheEntryRemovedException If entry was concurrently deleted.
- */
- protected GridTuple3<GridCacheOperation, V, byte[]> applyTransformClosures(GridCacheTxEntry<K, V> txEntry,
- boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException {
- GridCacheContext cacheCtx = txEntry.context();
-
- assert cacheCtx != null;
-
- if (isSystemInvalidate())
- return F.t(cacheCtx.isStoreEnabled() ? RELOAD : DELETE, null, null);
- if (F.isEmpty(txEntry.transformClosures()))
- return F.t(txEntry.op(), txEntry.value(), txEntry.valueBytes());
- else {
- try {
- boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
-
- V val = txEntry.hasValue() ? txEntry.value() :
- txEntry.cached().innerGet(this,
- /*swap*/false,
- /*read through*/false,
- /*fail fast*/true,
- /*unmarshal*/true,
- /*metrics*/metrics,
- /*event*/recordEvt,
- /*temporary*/true,
- /*subjId*/subjId,
- /**closure name */recordEvt ? F.first(txEntry.transformClosures()) : null,
- resolveTaskName(),
- CU.<K, V>empty());
-
- try {
- for (IgniteClosure<V, V> clos : txEntry.transformClosures())
- val = clos.apply(val);
- }
- catch (Throwable e) {
- throw new IgniteException("Transform closure must not throw any exceptions " +
- "(transaction will be invalidated)", e);
- }
-
- GridCacheOperation op = val == null ? DELETE : UPDATE;
-
- return F.t(op, (V)cacheCtx.<V>unwrapTemporary(val), null);
- }
- catch (GridCacheFilterFailedException e) {
- assert false : "Empty filter failed for innerGet: " + e;
-
- return null;
- }
- }
- }
-
- /**
- * @return Resolves task name.
- */
- public String resolveTaskName() {
- if (taskName != null)
- return taskName;
-
- return (taskName = cctx.kernalContext().task().resolveTaskName(taskNameHash));
- }
-
- /**
- * @param e Transaction entry.
- * @param primaryOnly Flag to include backups into check or not.
- * @return {@code True} if entry is locally mapped as a primary or back up node.
- */
- protected boolean isNearLocallyMapped(GridCacheTxEntry<K, V> e, boolean primaryOnly) {
- GridCacheContext<K, V> cacheCtx = e.context();
-
- if (!cacheCtx.isNear())
- return false;
-
- // Try to take either entry-recorded primary node ID,
- // or transaction node ID from near-local transactions.
- UUID nodeId = e.nodeId() == null ? local() ? this.nodeId : null : e.nodeId();
-
- if (nodeId != null && nodeId.equals(cctx.localNodeId()))
- return true;
-
- GridCacheEntryEx<K, V> cached = e.cached();
-
- int part = cached != null ? cached.partition() : cacheCtx.affinity().partition(e.key());
-
- List<ClusterNode> affNodes = cacheCtx.affinity().nodes(part, topologyVersion());
-
- e.locallyMapped(F.contains(affNodes, cctx.localNode()));
-
- if (primaryOnly) {
- ClusterNode primary = F.first(affNodes);
-
- if (primary == null && !isAffinityNode(cacheCtx.config()))
- return false;
-
- assert primary != null : "Primary node is null for affinity nodes: " + affNodes;
-
- return primary.isLocal();
- }
- else
- return e.locallyMapped();
- }
-
- /**
- * @param e Entry to evict if it qualifies for eviction.
- * @param primaryOnly Flag to try to evict only on primary node.
- * @return {@code True} if attempt was made to evict the entry.
- * @throws IgniteCheckedException If failed.
- */
- protected boolean evictNearEntry(GridCacheTxEntry<K, V> e, boolean primaryOnly) throws IgniteCheckedException {
- assert e != null;
-
- if (isNearLocallyMapped(e, primaryOnly)) {
- GridCacheEntryEx<K, V> cached = e.cached();
-
- assert cached instanceof GridNearCacheEntry : "Invalid cache entry: " + e;
-
- if (log.isDebugEnabled())
- log.debug("Evicting dht-local entry from near cache [entry=" + cached + ", tx=" + this + ']');
-
- if (cached != null && cached.markObsolete(xidVer))
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- writeExternalMeta(out);
-
- out.writeObject(xidVer);
- out.writeBoolean(invalidate);
- out.writeLong(timeout);
- out.writeLong(threadId);
- out.writeLong(startTime);
-
- U.writeUuid(out, nodeId);
-
- out.write(isolation.ordinal());
- out.write(concurrency.ordinal());
- out.write(state().ordinal());
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- readExternalMeta(in);
-
- xidVer = (GridCacheVersion)in.readObject();
- invalidate = in.readBoolean();
- timeout = in.readLong();
- threadId = in.readLong();
- startTime = in.readLong();
-
- nodeId = U.readUuid(in);
-
- isolation = IgniteTxIsolation.fromOrdinal(in.read());
- concurrency = IgniteTxConcurrency.fromOrdinal(in.read());
-
- state = IgniteTxState.fromOrdinal(in.read());
- }
-
- /**
- * Reconstructs object on unmarshalling.
- *
- * @return Reconstructed object.
- * @throws ObjectStreamException Thrown in case of unmarshalling error.
- */
- protected Object readResolve() throws ObjectStreamException {
- return new TxShadow(
- xidVer.asGridUuid(),
- nodeId,
- threadId,
- startTime,
- isolation,
- concurrency,
- invalidate,
- implicit,
- timeout,
- state(),
- isRollbackOnly()
- );
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- return o == this || (o instanceof GridCacheTxAdapter && xidVer.equals(((GridCacheTxAdapter)o).xidVer));
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return xidVer.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return GridToStringBuilder.toString(GridCacheTxAdapter.class, this,
- "duration", (U.currentTimeMillis() - startTime) + "ms", "grpLock", groupLock(),
- "onePhaseCommit", onePhaseCommit);
- }
-
- /**
- * Transaction shadow class to be used for deserialization.
- */
- private static class TxShadow extends GridMetadataAwareAdapter implements IgniteTx {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Xid. */
- private final IgniteUuid xid;
-
- /** Node ID. */
- private final UUID nodeId;
-
- /** Thread ID. */
- private final long threadId;
-
- /** Start time. */
- private final long startTime;
-
- /** Transaction isolation. */
- private final IgniteTxIsolation isolation;
-
- /** Concurrency. */
- private final IgniteTxConcurrency concurrency;
-
- /** Invalidate flag. */
- private final boolean invalidate;
-
- /** Timeout. */
- private final long timeout;
-
- /** State. */
- private final IgniteTxState state;
-
- /** Rollback only flag. */
- private final boolean rollbackOnly;
-
- /** Implicit flag. */
- private final boolean implicit;
-
- /**
- * @param xid Xid.
- * @param nodeId Node ID.
- * @param threadId Thread ID.
- * @param startTime Start time.
- * @param isolation Isolation.
- * @param concurrency Concurrency.
- * @param invalidate Invalidate flag.
- * @param implicit Implicit flag.
- * @param timeout Transaction timeout.
- * @param state Transaction state.
- * @param rollbackOnly Rollback-only flag.
- */
- TxShadow(IgniteUuid xid, UUID nodeId, long threadId, long startTime, IgniteTxIsolation isolation,
- IgniteTxConcurrency concurrency, boolean invalidate, boolean implicit, long timeout,
- IgniteTxState state, boolean rollbackOnly) {
- this.xid = xid;
- this.nodeId = nodeId;
- this.threadId = threadId;
- this.startTime = startTime;
- this.isolation = isolation;
- this.concurrency = concurrency;
- this.invalidate = invalidate;
- this.implicit = implicit;
- this.timeout = timeout;
- this.state = state;
- this.rollbackOnly = rollbackOnly;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid xid() {
- return xid;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public long threadId() {
- return threadId;
- }
-
- /** {@inheritDoc} */
- @Override public long startTime() {
- return startTime;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxIsolation isolation() {
- return isolation;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxConcurrency concurrency() {
- return concurrency;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isInvalidate() {
- return invalidate;
- }
-
- /** {@inheritDoc} */
- @Override public boolean implicit() {
- return implicit;
- }
-
- /** {@inheritDoc} */
- @Override public long timeout() {
- return timeout;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTxState state() {
- return state;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isRollbackOnly() {
- return rollbackOnly;
- }
-
- /** {@inheritDoc} */
- @Override public long timeout(long timeout) {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public boolean setRollbackOnly() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public void commit() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<IgniteTx> commitAsync() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public void rollback() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- return this == o || o instanceof IgniteTx && xid.equals(((IgniteTx)o).xid());
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return xid.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TxShadow.class, this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
deleted file mode 100644
index 91b9cc0..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEntry.java
+++ /dev/null
@@ -1,1059 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.cache;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
-
-/**
- * Transaction entry. Note that it is essential that this class does not override
- * {@link #equals(Object)} method, as transaction entries should use referential
- * equality.
- */
-public class GridCacheTxEntry<K, V> implements GridPeerDeployAware, Externalizable, IgniteOptimizedMarshallable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "AbbreviationUsage", "UnusedDeclaration"})
- private static Object GG_CLASS_ID;
-
- /** Owning transaction. */
- @GridToStringExclude
- private GridCacheTxEx<K, V> tx;
-
- /** Cache key. */
- @GridToStringInclude
- private K key;
-
- /** Key bytes. */
- private byte[] keyBytes;
-
- /** Cache ID. */
- private int cacheId;
-
- /** Transient tx key. */
- private GridCacheTxKey<K> txKey;
-
- /** Cache value. */
- @GridToStringInclude
- private TxEntryValueHolder<K, V> val = new TxEntryValueHolder<>();
-
- /** Visible value for peek. */
- @GridToStringInclude
- private TxEntryValueHolder<K, V> prevVal = new TxEntryValueHolder<>();
-
- /** Filter bytes. */
- private byte[] filterBytes;
-
- /** Transform. */
- @GridToStringInclude
- private Collection<IgniteClosure<V, V>> transformClosCol;
-
- /** Transform closure bytes. */
- @GridToStringExclude
- private byte[] transformClosBytes;
-
- /** Time to live. */
- private long ttl;
-
- /** DR expire time (explicit) */
- private long drExpireTime = -1L;
-
- /** Explicit lock version if there is one. */
- @GridToStringInclude
- private GridCacheVersion explicitVer;
-
- /** DHT version. */
- private transient volatile GridCacheVersion dhtVer;
-
- /** Put filters. */
- @GridToStringInclude
- private IgnitePredicate<GridCacheEntry<K, V>>[] filters;
-
- /** Flag indicating whether filters passed. Used for fast-commit transactions. */
- private boolean filtersPassed;
-
- /** Flag indicating that filter is set and can not be replaced. */
- private transient boolean filtersSet;
-
- /** Underlying cache entry. */
- private transient volatile GridCacheEntryEx<K, V> entry;
-
- /** Cache registry. */
- private transient GridCacheContext<K, V> ctx;
-
- /** Prepared flag to prevent multiple candidate add. */
- @SuppressWarnings({"TransientFieldNotInitialized"})
- private transient AtomicBoolean prepared = new AtomicBoolean();
-
- /** Lock flag for colocated cache. */
- private transient boolean locked;
-
- /** Assigned node ID (required only for partitioned cache). */
- private transient UUID nodeId;
-
- /** Flag if this node is a back up node. */
- private boolean locMapped;
-
- /** Group lock entry flag. */
- private boolean grpLock;
-
- /** Flag indicating if this entry should be transferred to remote node. */
- private boolean transferRequired;
-
- /** Deployment enabled flag. */
- private boolean depEnabled;
-
- /** Data center replication version. */
- private GridCacheVersion drVer;
-
- /**
- * Required by {@link Externalizable}
- */
- public GridCacheTxEntry() {
- /* No-op. */
- }
-
- /**
- * This constructor is meant for remote transactions.
- *
- * @param ctx Cache registry.
- * @param tx Owning transaction.
- * @param op Operation.
- * @param val Value.
- * @param ttl Time to live.
- * @param drExpireTime DR expire time.
- * @param entry Cache entry.
- * @param drVer Data center replication version.
- */
- public GridCacheTxEntry(GridCacheContext<K, V> ctx, GridCacheTxEx<K, V> tx, GridCacheOperation op, V val,
- long ttl, long drExpireTime, GridCacheEntryEx<K, V> entry, @Nullable GridCacheVersion drVer) {
- assert ctx != null;
- assert tx != null;
- assert op != null;
- assert entry != null;
-
- this.ctx = ctx;
- this.tx = tx;
- this.val.value(op, val, false, false);
- this.entry = entry;
- this.ttl = ttl;
- this.drExpireTime = drExpireTime;
- this.drVer = drVer;
-
- key = entry.key();
- keyBytes = entry.keyBytes();
-
- cacheId = entry.context().cacheId();
-
- depEnabled = ctx.gridDeploy().enabled();
- }
-
- /**
- * This constructor is meant for local transactions.
- *
- * @param ctx Cache registry.
- * @param tx Owning transaction.
- * @param op Operation.
- * @param val Value.
- * @param transformClos Transform closure.
- * @param ttl Time to live.
- * @param entry Cache entry.
- * @param filters Put filters.
- * @param drVer Data center replication version.
- */
- public GridCacheTxEntry(GridCacheContext<K, V> ctx, GridCacheTxEx<K, V> tx, GridCacheOperation op,
- V val, IgniteClosure<V, V> transformClos, long ttl, GridCacheEntryEx<K,V> entry,
- IgnitePredicate<GridCacheEntry<K, V>>[] filters, GridCacheVersion drVer) {
- assert ctx != null;
- assert tx != null;
- assert op != null;
- assert entry != null;
-
- this.ctx = ctx;
- this.tx = tx;
- this.val.value(op, val, false, false);
- this.entry = entry;
- this.ttl = ttl;
- this.filters = filters;
- this.drVer = drVer;
-
- if (transformClos != null)
- addTransformClosure(transformClos);
-
- key = entry.key();
- keyBytes = entry.keyBytes();
-
- cacheId = entry.context().cacheId();
-
- depEnabled = ctx.gridDeploy().enabled();
- }
-
- /**
- * @return Cache context for this tx entry.
- */
- public GridCacheContext<K, V> context() {
- return ctx;
- }
-
- /**
- * @return Flag indicating if this entry is affinity mapped to the same node.
- */
- public boolean locallyMapped() {
- return locMapped;
- }
-
- /**
- * @param locMapped Flag indicating if this entry is affinity mapped to the same node.
- */
- public void locallyMapped(boolean locMapped) {
- this.locMapped = locMapped;
- }
-
- /**
- * @return {@code True} if this entry was added in group lock transaction and
- * this is not a group lock entry.
- */
- public boolean groupLockEntry() {
- return grpLock;
- }
-
- /**
- * @param grpLock {@code True} if this entry was added in group lock transaction and
- * this is not a group lock entry.
- */
- public void groupLockEntry(boolean grpLock) {
- this.grpLock = grpLock;
- }
-
- /**
- * @param transferRequired Sets flag indicating that transfer is required to remote node.
- */
- public void transferRequired(boolean transferRequired) {
- this.transferRequired = transferRequired;
- }
-
- /**
- * @return Flag indicating whether transfer is required to remote nodes.
- */
- public boolean transferRequired() {
- return transferRequired;
- }
-
- /**
- * @param ctx Context.
- * @return Clean copy of this entry.
- */
- public GridCacheTxEntry<K, V> cleanCopy(GridCacheContext<K, V> ctx) {
- GridCacheTxEntry<K, V> cp = new GridCacheTxEntry<>();
-
- cp.key = key;
- cp.cacheId = cacheId;
- cp.ctx = ctx;
-
- cp.val = new TxEntryValueHolder<>();
-
- cp.keyBytes = keyBytes;
- cp.filters = filters;
- cp.val.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
- cp.val.valueBytes(val.valueBytes());
- cp.transformClosCol = transformClosCol;
- cp.ttl = ttl;
- cp.drExpireTime = drExpireTime;
- cp.explicitVer = explicitVer;
- cp.grpLock = grpLock;
- cp.depEnabled = depEnabled;
- cp.drVer = drVer;
-
- return cp;
- }
-
- /**
- * @return Node ID.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return DHT version.
- */
- public GridCacheVersion dhtVersion() {
- return dhtVer;
- }
-
- /**
- * @param dhtVer DHT version.
- */
- public void dhtVersion(GridCacheVersion dhtVer) {
- this.dhtVer = dhtVer;
- }
-
- /**
- * @return {@code True} if tx entry was marked as locked.
- */
- public boolean locked() {
- return locked;
- }
-
- /**
- * Marks tx entry as locked.
- */
- public void markLocked() {
- locked = true;
- }
-
- /**
- * @param val Value to set.
- */
- void setAndMarkValid(V val) {
- setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue());
- }
-
- /**
- * @param op Operation.
- * @param val Value to set.
- */
- void setAndMarkValid(GridCacheOperation op, V val) {
- setAndMarkValid(op, val, this.val.hasWriteValue(), this.val.hasReadValue());
- }
-
- /**
- * @param op Operation.
- * @param val Value to set.
- * @param hasReadVal Has read value flag.
- * @param hasWriteVal Has write value flag.
- */
- void setAndMarkValid(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) {
- this.val.value(op, val, hasWriteVal, hasReadVal);
-
- markValid();
- }
-
- /**
- * Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible
- * to further peek operations.
- */
- void markValid() {
- prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue());
- }
-
- /**
- * Marks entry as prepared.
- *
- * @return True if entry was marked prepared by this call.
- */
- boolean markPrepared() {
- return prepared.compareAndSet(false, true);
- }
-
- /**
- * @return Entry key.
- */
- public K key() {
- return key;
- }
-
- /**
- * @return Cache ID.
- */
- public int cacheId() {
- return cacheId;
- }
-
- /**
- * @return Tx key.
- */
- public GridCacheTxKey<K> txKey() {
- if (txKey == null)
- txKey = new GridCacheTxKey<>(key, cacheId);
-
- return txKey;
- }
-
- /**
- *
- * @return Key bytes.
- */
- @Nullable public byte[] keyBytes() {
- byte[] bytes = keyBytes;
-
- if (bytes == null && entry != null) {
- bytes = entry.keyBytes();
-
- keyBytes = bytes;
- }
-
- return bytes;
- }
-
- /**
- * @param keyBytes Key bytes.
- */
- public void keyBytes(byte[] keyBytes) {
- initKeyBytes(keyBytes);
- }
-
- /**
- * @return Underlying cache entry.
- */
- public GridCacheEntryEx<K, V> cached() {
- return entry;
- }
-
- /**
- * @param entry Cache entry.
- * @param keyBytes Key bytes, possibly {@code null}.
- */
- public void cached(GridCacheEntryEx<K,V> entry, @Nullable byte[] keyBytes) {
- assert entry != null;
-
- assert entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this +
- ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + ctx.isDht() + ']';
-
- this.entry = entry;
-
- initKeyBytes(keyBytes);
- }
-
- /**
- * Initialized key bytes locally and on the underlying entry.
- *
- * @param bytes Key bytes to initialize.
- */
- private void initKeyBytes(@Nullable byte[] bytes) {
- if (bytes != null) {
- keyBytes = bytes;
-
- while (true) {
- try {
- if (entry != null)
- entry.keyBytes(bytes);
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- entry = ctx.cache().entryEx(key);
- }
- }
- }
- else if (entry != null) {
- bytes = entry.keyBytes();
-
- if (bytes != null)
- keyBytes = bytes;
- }
- }
-
- /**
- * @return Entry value.
- */
- @Nullable public V value() {
- return val.value();
- }
-
- /**
- * @return {@code True} if has value explicitly set.
- */
- public boolean hasValue() {
- return val.hasValue();
- }
-
- /**
- * @return {@code True} if has write value set.
- */
- public boolean hasWriteValue() {
- return val.hasWriteValue();
- }
-
- /**
- * @return {@code True} if has read value set.
- */
- public boolean hasReadValue() {
- return val.hasReadValue();
- }
-
- /**
- * @return Value visible for peek.
- */
- @Nullable public V previousValue() {
- return prevVal.value();
- }
-
- /**
- * @return {@code True} if has previous value explicitly set.
- */
- boolean hasPreviousValue() {
- return prevVal.hasValue();
- }
-
- /**
- * @return Previous operation to revert entry in case of filter failure.
- */
- @Nullable public GridCacheOperation previousOperation() {
- return prevVal.op();
- }
-
- /**
- * @return Value bytes.
- */
- @Nullable public byte[] valueBytes() {
- return val.valueBytes();
- }
-
- /**
- * @param valBytes Value bytes.
- */
- public void valueBytes(@Nullable byte[] valBytes) {
- val.valueBytes(valBytes);
- }
-
- /**
- * @return Time to live.
- */
- public long ttl() {
- return ttl;
- }
-
- /**
- * @param ttl Time to live.
- */
- public void ttl(long ttl) {
- this.ttl = ttl;
- }
-
- /**
- * @return DR expire time.
- */
- public long drExpireTime() {
- return drExpireTime;
- }
-
- /**
- * @param drExpireTime DR expire time.
- */
- public void drExpireTime(long drExpireTime) {
- this.drExpireTime = drExpireTime;
- }
-
- /**
- * @param val Entry value.
- * @param writeVal Write value flag.
- * @param readVal Read value flag.
- */
- public void value(@Nullable V val, boolean writeVal, boolean readVal) {
- this.val.value(this.val.op(), val, writeVal, readVal);
- }
-
- /**
- * Sets read value if this tx entrty does not have write value yet.
- *
- * @param val Read value to set.
- */
- public void readValue(@Nullable V val) {
- this.val.value(this.val.op(), val, false, true);
- }
-
- /**
- * @param transformClos Transform closure.
- */
- public void addTransformClosure(IgniteClosure<V, V> transformClos) {
- if (transformClosCol == null)
- transformClosCol = new LinkedList<>();
-
- transformClosCol.add(transformClos);
-
- // Must clear transform closure bytes since collection has changed.
- transformClosBytes = null;
-
- val.op(TRANSFORM);
- }
-
- /**
- * @return Collection of transform closures.
- */
- public Collection<IgniteClosure<V, V>> transformClosures() {
- return transformClosCol;
- }
-
- /**
- * @param transformClosCol Collection of transform closures.
- */
- public void transformClosures(@Nullable Collection<IgniteClosure<V, V>> transformClosCol) {
- this.transformClosCol = transformClosCol;
-
- // Must clear transform closure bytes since collection has changed.
- transformClosBytes = null;
- }
-
- /**
- * @return Cache operation.
- */
- public GridCacheOperation op() {
- return val.op();
- }
-
- /**
- * @param op Cache operation.
- */
- public void op(GridCacheOperation op) {
- val.op(op);
- }
-
- /**
- * @return {@code True} if read entry.
- */
- public boolean isRead() {
- return op() == READ;
- }
-
- /**
- * @param explicitVer Explicit version.
- */
- public void explicitVersion(GridCacheVersion explicitVer) {
- this.explicitVer = explicitVer;
- }
-
- /**
- * @return Explicit version.
- */
- public GridCacheVersion explicitVersion() {
- return explicitVer;
- }
-
- /**
- * @return DR version.
- */
- @Nullable public GridCacheVersion drVersion() {
- return drVer;
- }
-
- /**
- * @param drVer DR version.
- */
- public void drVersion(@Nullable GridCacheVersion drVer) {
- this.drVer = drVer;
- }
-
- /**
- * @return Put filters.
- */
- public IgnitePredicate<GridCacheEntry<K, V>>[] filters() {
- return filters;
- }
-
- /**
- * @param filters Put filters.
- */
- public void filters(IgnitePredicate<GridCacheEntry<K, V>>[] filters) {
- filterBytes = null;
-
- this.filters = filters;
- }
-
- /**
- * @return {@code True} if filters passed for fast-commit transactions.
- */
- public boolean filtersPassed() {
- return filtersPassed;
- }
-
- /**
- * @param filtersPassed {@code True} if filters passed for fast-commit transactions.
- */
- public void filtersPassed(boolean filtersPassed) {
- this.filtersPassed = filtersPassed;
- }
-
- /**
- * @return {@code True} if filters are set.
- */
- public boolean filtersSet() {
- return filtersSet;
- }
-
- /**
- * @param filtersSet {@code True} if filters are set and should not be replaced.
- */
- public void filtersSet(boolean filtersSet) {
- this.filtersSet = filtersSet;
- }
-
- /**
- * @param ctx Context.
- * @throws IgniteCheckedException If failed.
- */
- public void marshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
- // Do not serialize filters if they are null.
- if (depEnabled) {
- if (keyBytes == null)
- keyBytes = entry.getOrMarshalKeyBytes();
-
- if (transformClosBytes == null && transformClosCol != null)
- transformClosBytes = CU.marshal(ctx, transformClosCol);
-
- if (F.isEmptyOrNulls(filters))
- filterBytes = null;
- else if (filterBytes == null)
- filterBytes = CU.marshal(ctx, filters);
- }
-
- val.marshal(ctx, context(), depEnabled);
- }
-
- /**
- * Unmarshalls entry.
- *
- * @param ctx Cache context.
- * @param clsLdr Class loader.
- * @throws IgniteCheckedException If un-marshalling failed.
- */
- public void unmarshal(GridCacheSharedContext<K, V> ctx, boolean near, ClassLoader clsLdr) throws IgniteCheckedException {
- if (this.ctx == null) {
- GridCacheContext<K, V> cacheCtx = ctx.cacheContext(cacheId);
-
- if (cacheCtx.isNear() && !near)
- cacheCtx = cacheCtx.near().dht().context();
- else if (!cacheCtx.isNear() && near)
- cacheCtx = cacheCtx.dht().near().context();
-
- this.ctx = cacheCtx;
- }
-
- if (depEnabled) {
- // Don't unmarshal more than once by checking key for null.
- if (key == null)
- key = ctx.marshaller().unmarshal(keyBytes, clsLdr);
-
- // Unmarshal transform closure anyway if it exists.
- if (transformClosBytes != null && transformClosCol == null)
- transformClosCol = ctx.marshaller().unmarshal(transformClosBytes, clsLdr);
-
- if (filters == null && filterBytes != null) {
- filters = ctx.marshaller().unmarshal(filterBytes, clsLdr);
-
- if (filters == null)
- filters = CU.empty();
- }
- }
-
- val.unmarshal(this.ctx, clsLdr, depEnabled);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeBoolean(depEnabled);
-
- if (depEnabled) {
- U.writeByteArray(out, keyBytes);
- U.writeByteArray(out, transformClosBytes);
- U.writeByteArray(out, filterBytes);
- }
- else {
- out.writeObject(key);
- U.writeCollection(out, transformClosCol);
- U.writeArray(out, filters);
- }
-
- out.writeInt(cacheId);
-
- val.writeTo(out);
-
- out.writeLong(ttl);
- out.writeLong(drExpireTime);
-
- CU.writeVersion(out, explicitVer);
- out.writeBoolean(grpLock);
- CU.writeVersion(out, drVer);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"unchecked"})
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- depEnabled = in.readBoolean();
-
- if (depEnabled) {
- keyBytes = U.readByteArray(in);
- transformClosBytes = U.readByteArray(in);
- filterBytes = U.readByteArray(in);
- }
- else {
- key = (K)in.readObject();
- transformClosCol = U.readCollection(in);
- filters = U.readEntryFilterArray(in);
- }
-
- cacheId = in.readInt();
-
- val.readFrom(in);
-
- ttl = in.readLong();
- drExpireTime = in.readLong();
-
- explicitVer = CU.readVersion(in);
- grpLock = in.readBoolean();
- drVer = CU.readVersion(in);
- }
-
- /** {@inheritDoc} */
- @Override public Object ggClassId() {
- return GG_CLASS_ID;
- }
-
- /** {@inheritDoc} */
- @Override public Class<?> deployClass() {
- ClassLoader clsLdr = getClass().getClassLoader();
-
- V val = value();
-
- // First of all check classes that may be loaded by class loader other than application one.
- return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ?
- key.getClass() : val != null ? val.getClass() : getClass();
- }
-
- /** {@inheritDoc} */
- @Override public ClassLoader classLoader() {
- return deployClass().getClassLoader();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return GridToStringBuilder.toString(GridCacheTxEntry.class, this,
- "keyBytesSize", keyBytes == null ? "null" : Integer.toString(keyBytes.length),
- "xidVer", tx == null ? "null" : tx.xidVersion());
- }
-
- /**
- * Auxiliary class to hold value, value-has-been-set flag, value update operation, value bytes.
- */
- private static class TxEntryValueHolder<K, V> {
- /** */
- @GridToStringInclude
- private V val;
-
- /** */
- @GridToStringExclude
- private byte[] valBytes;
-
- /** */
- @GridToStringInclude
- private GridCacheOperation op = NOOP;
-
- /** Flag indicating that value has been set for write. */
- private boolean hasWriteVal;
-
- /** Flag indicating that value has been set for read. */
- private boolean hasReadVal;
-
- /** Flag indicating that bytes were sent. */
- private boolean valBytesSent;
-
- /**
- * @param op Cache operation.
- * @param val Value.
- * @param hasWriteVal Write value presence flag.
- * @param hasReadVal Read value presence flag.
- */
- public void value(GridCacheOperation op, V val, boolean hasWriteVal, boolean hasReadVal) {
- if (hasReadVal && this.hasWriteVal)
- return;
-
- boolean clean = this.val != null;
-
- this.op = op;
- this.val = val;
-
- if (clean)
- valBytes = null;
-
- this.hasWriteVal = hasWriteVal || op == CREATE || op == UPDATE || op == DELETE;
- this.hasReadVal = hasReadVal || op == READ;
- }
-
- /**
- * @return {@code True} if has read or write value.
- */
- public boolean hasValue() {
- return hasWriteVal || hasReadVal;
- }
-
- /**
- * Gets stored value.
- *
- * @return Value.
- */
- public V value() {
- return val;
- }
-
- /**
- * @param val Stored value.
- */
- public void value(@Nullable V val) {
- boolean clean = this.val != null;
-
- this.val = val;
-
- if (clean)
- valBytes = null;
- }
-
- /**
- * Gets cache operation.
- *
- * @return Cache operation.
- */
- public GridCacheOperation op() {
- return op;
- }
-
- /**
- * Sets cache operation.
- *
- * @param op Cache operation.
- */
- public void op(GridCacheOperation op) {
- this.op = op;
- }
-
- /**
- * @return {@code True} if write value was set.
- */
- public boolean hasWriteValue() {
- return hasWriteVal;
- }
-
- /**
- * @return {@code True} if read value was set.
- */
- public boolean hasReadValue() {
- return hasReadVal;
- }
-
- /**
- * Sets value bytes.
- *
- * @param valBytes Value bytes to set.
- */
- public void valueBytes(@Nullable byte[] valBytes) {
- this.valBytes = valBytes;
- }
-
- /**
- * Gets value bytes.
- *
- * @return Value bytes.
- */
- public byte[] valueBytes() {
- return valBytes;
- }
-
- /**
- * @param ctx Cache context.
- * @param depEnabled Deployment enabled flag.
- * @throws IgniteCheckedException If marshaling failed.
- */
- public void marshal(GridCacheSharedContext<K, V> sharedCtx, GridCacheContext<K, V> ctx, boolean depEnabled)
- throws IgniteCheckedException {
- boolean valIsByteArr = val != null && val instanceof byte[];
-
- // Do not send write values to remote nodes.
- if (hasWriteVal && val != null && !valIsByteArr && valBytes == null &&
- (depEnabled || !ctx.isUnmarshalValues()))
- valBytes = CU.marshal(sharedCtx, val);
-
- valBytesSent = hasWriteVal && !valIsByteArr && valBytes != null && (depEnabled || !ctx.isUnmarshalValues());
- }
-
- /**
- * @param ctx Cache context.
- * @param ldr Class loader.
- * @param depEnabled Deployment enabled flag.
- * @throws IgniteCheckedException If unmarshalling failed.
- */
- public void unmarshal(GridCacheContext<K, V> ctx, ClassLoader ldr, boolean depEnabled) throws IgniteCheckedException {
- if (valBytes != null && val == null && (ctx.isUnmarshalValues() || op == TRANSFORM || depEnabled))
- val = ctx.marshaller().unmarshal(valBytes, ldr);
- }
-
- /**
- * @param out Data output.
- * @throws IOException If failed.
- */
- public void writeTo(ObjectOutput out) throws IOException {
- out.writeBoolean(hasWriteVal);
- out.writeBoolean(valBytesSent);
-
- if (hasWriteVal) {
- if (valBytesSent)
- U.writeByteArray(out, valBytes);
- else {
- if (val != null && val instanceof byte[]) {
- out.writeBoolean(true);
-
- U.writeByteArray(out, (byte[])val);
- }
- else {
- out.writeBoolean(false);
-
- out.writeObject(val);
- }
- }
- }
-
- out.writeInt(op.ordinal());
- }
-
- /**
- * @param in Data input.
- * @throws IOException If failed.
- * @throws ClassNotFoundException If failed.
- */
- public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException {
- hasWriteVal = in.readBoolean();
- valBytesSent = in.readBoolean();
-
- if (hasWriteVal) {
- if (valBytesSent)
- valBytes = U.readByteArray(in);
- else
- val = in.readBoolean() ? (V)U.readByteArray(in) : (V)in.readObject();
- }
-
- op = fromOrdinal(in.readInt());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "[op=" + op +", val=" + val + ", valBytesLen=" + (valBytes == null ? 0 : valBytes.length) + ']';
- }
- }
-}