You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/22 11:42:32 UTC
[30/50] [abbrv] incubator-ignite git commit: GG-9141 - Renaming.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cd46f567/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
new file mode 100644
index 0000000..4fc7140
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -0,0 +1,3179 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.transactions;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.portables.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
+import org.gridgain.grid.kernal.processors.cache.dr.*;
+import org.gridgain.grid.kernal.processors.dr.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.transactions.IgniteTxState.*;
+import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
+import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
+
+/**
+ * Transaction adapter for cache transactions.
+ */
+public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
+ implements IgniteTxLocalEx<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Per-transaction read map. */
+ @GridToStringExclude
+ protected Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> txMap;
+
+ /** Read view on transaction map. */
+ @GridToStringExclude
+ protected IgniteTxMap<K, V> readView;
+
+ /** Write view on transaction map. */
+ @GridToStringExclude
+ protected IgniteTxMap<K, V> writeView;
+
+ /** Minimal version encountered (either explicit lock or XID of this transaction). */
+ protected GridCacheVersion minVer;
+
+ /** Flag indicating with TM commit happened. */
+ protected AtomicBoolean doneFlag = new AtomicBoolean(false);
+
+ /** Committed versions, relative to base. */
+ private Collection<GridCacheVersion> committedVers = Collections.emptyList();
+
+ /** Rolled back versions, relative to base. */
+ private Collection<GridCacheVersion> rolledbackVers = Collections.emptyList();
+
+ /** Base for completed versions. */
+ private GridCacheVersion completedBase;
+
+ /** Flag indicating partition lock in group lock transaction. */
+ private boolean partLock;
+
+ /** Flag indicating that transformed values should be sent to remote nodes. */
+ private boolean sndTransformedVals;
+
+ /** Commit error. */
+ protected AtomicReference<Throwable> commitErr = new AtomicReference<>();
+
+ /** Active cache IDs. */
+ protected Set<Integer> activeCacheIds = new HashSet<>();
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ protected IgniteTxLocalAdapter() {
+ // No-op.
+ }
+
+ /**
+ * @param cctx Cache registry.
+ * @param xidVer Transaction ID.
+ * @param implicit {@code True} if transaction was implicitly started by the system,
+ * {@code false} if it was started explicitly by user.
+ * @param implicitSingle {@code True} if transaction is implicit with only one key.
+ * @param sys System flag.
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout Timeout.
+ * @param txSize Expected transaction size.
+ * @param grpLockKey Group lock key if this is a group-lock transaction.
+ * @param partLock {@code True} if this is a group-lock transaction and lock is acquired for whole partition.
+ */
+ protected IgniteTxLocalAdapter(
+ GridCacheSharedContext<K, V> cctx,
+ GridCacheVersion xidVer,
+ boolean implicit,
+ boolean implicitSingle,
+ boolean sys,
+ IgniteTxConcurrency concurrency,
+ IgniteTxIsolation isolation,
+ long timeout,
+ boolean invalidate,
+ boolean storeEnabled,
+ int txSize,
+ @Nullable IgniteTxKey grpLockKey,
+ boolean partLock,
+ @Nullable UUID subjId,
+ int taskNameHash
+ ) {
+ super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
+ storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
+
+ assert !partLock || grpLockKey != null;
+
+ this.partLock = partLock;
+
+ minVer = xidVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID eventNodeId() {
+ return cctx.localNodeId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID originatingNodeId() {
+ return cctx.localNodeId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return txMap.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<UUID> masterNodeIds() {
+ return Collections.singleton(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean partitionLock() {
+ return partLock;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Throwable commitError() {
+ return commitErr.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commitError(Throwable e) {
+ commitErr.compareAndSet(null, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) {
+ assert false;
+ return false;
+ }
+
+ /**
+ * Gets collection of active cache IDs for this transaction.
+ *
+ * @return Collection of active cache IDs.
+ */
+ @Override public Collection<Integer> activeCacheIds() {
+ return activeCacheIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isStarted() {
+ return txMap != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasWriteKey(IgniteTxKey<K> key) {
+ return writeView.containsKey(key);
+ }
+
+ /**
+ * @return Transaction read set.
+ */
+ @Override public Set<IgniteTxKey<K>> readSet() {
+ return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : readView.keySet();
+ }
+
+ /**
+ * @return Transaction write set.
+ */
+ @Override public Set<IgniteTxKey<K>> writeSet() {
+ return txMap == null ? Collections.<IgniteTxKey<K>>emptySet() : writeView.keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed(IgniteTxKey<K> key) {
+ if (txMap == null)
+ return false;
+
+ IgniteTxEntry<K, V> e = txMap.get(key);
+
+ return e != null && e.op() == DELETE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap() {
+ return readView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : readView;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> writeMap() {
+ return writeView == null ? Collections.<IgniteTxKey<K>, IgniteTxEntry<K, V>>emptyMap() : writeView;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry<K, V>> allEntries() {
+ return txMap == null ? Collections.<IgniteTxEntry<K, V>>emptySet() : txMap.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry<K, V>> readEntries() {
+ return readView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : readView.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry<K, V>> writeEntries() {
+ return writeView == null ? Collections.<IgniteTxEntry<K, V>>emptyList() : writeView.values();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteTxEntry<K, V> entry(IgniteTxKey<K> key) {
+ return txMap == null ? null : txMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void seal() {
+ if (readView != null)
+ readView.seal();
+
+ if (writeView != null)
+ writeView.seal();
+ }
+
+ /**
+ * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
+ * to remote nodes.
+ */
+ public void sendTransformedValues(boolean snd) {
+ sndTransformedVals = snd;
+ }
+
+ /**
+ * @return {@code True} if should be committed after lock is acquired.
+ */
+ protected boolean commitAfterLock() {
+ return implicit() && (!dht() || colocated());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"RedundantTypeArguments"})
+ @Nullable @Override public GridTuple<V> peek(
+ GridCacheContext<K, V> cacheCtx,
+ boolean failFast,
+ K key,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter
+ ) throws GridCacheFilterFailedException {
+ IgniteTxEntry<K, V> e = txMap == null ? null : txMap.get(cacheCtx.txKey(key));
+
+ if (e != null) {
+ // We should look at tx entry previous value. If this is a user peek then previous
+ // value is the same as value. If this is a filter evaluation peek then previous value holds
+ // value visible to filter while value contains value enlisted for write.
+ if (!F.isAll(e.cached().wrap(false), filter))
+ return e.hasPreviousValue() ? F.t(CU.<V>failed(failFast, e.previousValue())) : null;
+
+ return e.hasPreviousValue() ? F.t(e.previousValue()) : null;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> loadMissing(
+ final GridCacheContext<K, V> cacheCtx,
+ boolean async,
+ final Collection<? extends K> keys,
+ boolean deserializePortable,
+ final IgniteBiInClosure<K, V> c
+ ) {
+ if (!async) {
+ try {
+ return new GridFinishedFuture<>(cctx.kernalContext(),
+ cacheCtx.store().loadAllFromStore(this, keys, c));
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(cctx.kernalContext(), e);
+ }
+ }
+ else
+ return cctx.kernalContext().closure().callLocalSafe(
+ new GPC<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ return cacheCtx.store().loadAllFromStore(IgniteTxLocalAdapter.this, keys, c);
+ }
+ },
+ true);
+ }
+
+ /**
+ * Gets minimum version present in transaction.
+ *
+ * @return Minimum versions.
+ */
+ @Override public GridCacheVersion minVersion() {
+ return minVer;
+ }
+
+ /**
+ * @throws IgniteCheckedException If prepare step failed.
+ */
+ @SuppressWarnings({"CatchGenericClass"})
+ public void userPrepare() throws IgniteCheckedException {
+ if (state() != PREPARING) {
+ if (timedOut())
+ throw new IgniteTxTimeoutException("Transaction timed out: " + this);
+
+ IgniteTxState state = state();
+
+ setRollbackOnly();
+
+ throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']');
+ }
+
+ checkValid();
+
+ try {
+ cctx.tm().prepareTx(this);
+ }
+ catch (IgniteCheckedException e) {
+ throw e;
+ }
+ catch (Throwable e) {
+ setRollbackOnly();
+
+ throw new IgniteCheckedException("Transaction validation produced a runtime exception: " + this, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void commit() throws IgniteCheckedException {
+ try {
+ commitAsync().get();
+ }
+ finally {
+ cctx.tm().txContextReset();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepare() throws IgniteCheckedException {
+ prepareAsync().get();
+ }
+
+ /**
+ * Checks that locks are in proper state for commit.
+ *
+ * @param entry Cache entry to check.
+ */
+ private void checkCommitLocks(GridCacheEntryEx<K, V> entry) {
+ assert ownsLockUnsafe(entry) : "Lock is not owned for commit in PESSIMISTIC mode [entry=" + entry +
+ ", tx=" + this + ']';
+ }
+
+ /**
+ * Uncommits transaction by invalidating all of its entries.
+ */
+ @SuppressWarnings({"CatchGenericClass"})
+ private void uncommit() {
+ for (IgniteTxEntry<K, V> e : writeMap().values()) {
+ try {
+ GridCacheEntryEx<K, V> cacheEntry = e.cached();
+
+ if (e.op() != NOOP)
+ cacheEntry.invalidate(null, xidVer);
+ }
+ catch (Throwable t) {
+ U.error(log, "Failed to invalidate transaction entries while reverting a commit.", t);
+
+ break;
+ }
+ }
+
+ cctx.tm().uncommitTx(this);
+ }
+
+ /**
+ * Gets cache entry for given key.
+ *
+ * @param key Key.
+ * @return Cache entry.
+ */
+ protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key) {
+ return cacheCtx.cache().entryEx(key.key());
+ }
+
+ /**
+ * Gets cache entry for given key and topology version.
+ *
+ * @param key Key.
+ * @param topVer Topology version.
+ * @return Cache entry.
+ */
+ protected GridCacheEntryEx<K, V> entryEx(GridCacheContext<K, V> cacheCtx, IgniteTxKey<K> key, long topVer) {
+ return cacheCtx.cache().entryEx(key.key(), topVer);
+ }
+
+ /**
+ * Performs batch database operations. This commit must be called
+ * before {@link #userCommit()}. This way if there is a DB failure,
+ * cache transaction can still be rolled back.
+ *
+ * @param writeEntries Transaction write set.
+ * @throws IgniteCheckedException If batch update failed.
+ */
+ @SuppressWarnings({"CatchGenericClass"})
+ protected void batchStoreCommit(Iterable<IgniteTxEntry<K, V>> writeEntries) throws IgniteCheckedException {
+ GridCacheStoreManager<K, V> store = store();
+
+ if (store != null && storeEnabled() && (!internal() || groupLock()) && (near() || store.writeToStoreFromDht())) {
+ try {
+ if (writeEntries != null) {
+ Map<K, IgniteBiTuple<V, GridCacheVersion>> putMap = null;
+ List<K> rmvCol = null;
+
+ boolean skipNear = near() && store.writeToStoreFromDht();
+
+ for (IgniteTxEntry<K, V> e : writeEntries) {
+ if (skipNear && e.cached().isNear())
+ continue;
+
+ boolean intercept = e.context().config().getInterceptor() != null;
+
+ if (intercept || !F.isEmpty(e.transformClosures()))
+ e.cached().unswap(true, false);
+
+ GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(e, false);
+
+ GridCacheContext<K, V> cacheCtx = e.context();
+
+ GridCacheOperation op = res.get1();
+ K key = e.key();
+ V val = res.get2();
+ GridCacheVersion ver = writeVersion();
+
+ if (op == CREATE || op == UPDATE) {
+ // Batch-process all removes if needed.
+ if (rmvCol != null && !rmvCol.isEmpty()) {
+ store.removeAllFromStore(this, rmvCol);
+
+ // Reset.
+ rmvCol.clear();
+ }
+
+ if (intercept) {
+ V old = e.cached().rawGetOrUnmarshal(true);
+
+ val = (V)cacheCtx.config().getInterceptor().onBeforePut(key, old, val);
+
+ if (val == null)
+ continue;
+
+ val = cacheCtx.unwrapTemporary(val);
+ }
+
+ if (putMap == null)
+ putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
+
+ putMap.put(key, F.t(val, ver));
+ }
+ else if (op == DELETE) {
+ // Batch-process all puts if needed.
+ if (putMap != null && !putMap.isEmpty()) {
+ store.putAllToStore(this, putMap);
+
+ // Reset.
+ putMap.clear();
+ }
+
+ if (intercept) {
+ V old = e.cached().rawGetOrUnmarshal(true);
+
+ IgniteBiTuple<Boolean, V> t = cacheCtx.config().<K, V>getInterceptor()
+ .onBeforeRemove(key, old);
+
+ if (cacheCtx.cancelRemove(t))
+ continue;
+ }
+
+ if (rmvCol == null)
+ rmvCol = new LinkedList<>();
+
+ rmvCol.add(key);
+ }
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring NOOP entry for batch store commit: " + e);
+ }
+
+ if (putMap != null && !putMap.isEmpty()) {
+ assert rmvCol == null || rmvCol.isEmpty();
+
+ // Batch put at the end of transaction.
+ store.putAllToStore(this, putMap);
+ }
+
+ if (rmvCol != null && !rmvCol.isEmpty()) {
+ assert putMap == null || putMap.isEmpty();
+
+ // Batch remove at the end of transaction.
+ store.removeAllFromStore(this, rmvCol);
+ }
+ }
+
+ // Commit while locks are held.
+ store.txEnd(this, true);
+ }
+ catch (IgniteCheckedException ex) {
+ commitError(ex);
+
+ setRollbackOnly();
+
+ // Safe to remove transaction from committed tx list because nothing was committed yet.
+ cctx.tm().removeCommittedTx(this);
+
+ throw ex;
+ }
+ catch (Throwable ex) {
+ commitError(ex);
+
+ setRollbackOnly();
+
+ // Safe to remove transaction from committed tx list because nothing was committed yet.
+ cctx.tm().removeCommittedTx(this);
+
+ throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CatchGenericClass"})
+ @Override public void userCommit() throws IgniteCheckedException {
+ IgniteTxState state = state();
+
+ if (state != COMMITTING) {
+ if (timedOut())
+ throw new IgniteTxTimeoutException("Transaction timed out: " + this);
+
+ setRollbackOnly();
+
+ throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
+ }
+
+ checkValid();
+
+ boolean empty = F.isEmpty(near() ? txMap : writeMap());
+
+ // Register this transaction as completed prior to write-phase to
+ // ensure proper lock ordering for removed entries.
+ // We add colocated transaction to committed set even if it is empty to correctly order
+ // locks on backup nodes.
+ if (!empty || colocated())
+ cctx.tm().addCommittedTx(this);
+
+ if (groupLock())
+ addGroupTxMapping(writeSet());
+
+ if (!empty) {
+ // We are holding transaction-level locks for entries here, so we can get next write version.
+ writeVersion(cctx.versions().next(topologyVersion()));
+
+ batchStoreCommit(writeMap().values());
+
+ try {
+ cctx.tm().txContext(this);
+
+ long topVer = topologyVersion();
+
+ /*
+ * Commit to cache. Note that for 'near' transaction we loop through all the entries.
+ */
+ for (IgniteTxEntry<K, V> txEntry : (near() ? allEntries() : writeEntries())) {
+ GridCacheContext<K, V> cacheCtx = txEntry.context();
+
+ GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE;
+
+ UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId();
+
+ try {
+ while (true) {
+ try {
+ GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+ // Must try to evict near entries before committing from
+ // transaction manager to make sure locks are held.
+ if (!evictNearEntry(txEntry, false)) {
+ if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) {
+ cached.markObsolete(xidVer);
+
+ break;
+ }
+
+ if (cached.detached())
+ break;
+
+ GridCacheEntryEx<K, V> nearCached = null;
+
+ boolean metrics = true;
+
+ if (updateNearCache(cacheCtx, txEntry.key(), topVer))
+ nearCached = cacheCtx.dht().near().peekEx(txEntry.key());
+ else if (cacheCtx.isNear() && txEntry.locallyMapped())
+ metrics = false;
+
+ boolean evt = !isNearLocallyMapped(txEntry, false);
+
+ // For near local transactions we must record DHT version
+ // in order to keep near entries on backup nodes until
+ // backup remote transaction completes.
+ if (cacheCtx.isNear())
+ ((GridNearCacheEntry<K, V>)cached).recordDhtVersion(txEntry.dhtVersion());
+
+ if (!F.isEmpty(txEntry.transformClosures()) || !F.isEmpty(txEntry.filters()))
+ txEntry.cached().unswap(true, false);
+
+ GridTuple3<GridCacheOperation, V, byte[]> res = applyTransformClosures(txEntry,
+ true);
+
+ GridCacheOperation op = res.get1();
+ V val = res.get2();
+ byte[] valBytes = res.get3();
+
+ // Preserve TTL if needed.
+ if (txEntry.ttl() < 0)
+ txEntry.ttl(cached.ttl());
+
+ // Deal with DR conflicts.
+ GridCacheVersion explicitVer = txEntry.drVersion() != null ?
+ txEntry.drVersion() : writeVersion();
+
+ GridDrResolveResult<V> drRes = cacheCtx.dr().resolveTx(cached,
+ txEntry,
+ explicitVer,
+ op,
+ val,
+ valBytes,
+ txEntry.ttl(),
+ txEntry.drExpireTime());
+
+ if (drRes != null) {
+ op = drRes.operation();
+ val = drRes.value();
+ valBytes = drRes.valueBytes();
+
+ if (drRes.isMerge())
+ explicitVer = writeVersion();
+ }
+ else
+ // Nullify explicit version so that innerSet/innerRemove will work as usual.
+ explicitVer = null;
+
+ if (sndTransformedVals || (drRes != null)) {
+ assert sndTransformedVals && cacheCtx.isReplicated() || (drRes != null);
+
+ txEntry.value(val, true, false);
+ txEntry.valueBytes(valBytes);
+ txEntry.op(op);
+ txEntry.transformClosures(null);
+ txEntry.drVersion(explicitVer);
+ }
+
+ if (op == CREATE || op == UPDATE) {
+ GridCacheUpdateTxResult<V> updRes = cached.innerSet(
+ this,
+ eventNodeId(),
+ txEntry.nodeId(),
+ val,
+ valBytes,
+ false,
+ false,
+ txEntry.ttl(),
+ evt,
+ metrics,
+ topVer,
+ txEntry.filters(),
+ cached.detached() ? DR_NONE : drType,
+ txEntry.drExpireTime(),
+ cached.isNear() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName());
+
+ if (nearCached != null && updRes.success())
+ nearCached.innerSet(
+ null,
+ eventNodeId(),
+ nodeId,
+ val,
+ valBytes,
+ false,
+ false,
+ txEntry.ttl(),
+ false,
+ metrics,
+ topVer,
+ CU.<K, V>empty(),
+ DR_NONE,
+ txEntry.drExpireTime(),
+ null,
+ CU.subjectId(this, cctx),
+ resolveTaskName());
+ }
+ else if (op == DELETE) {
+ GridCacheUpdateTxResult<V> updRes = cached.innerRemove(
+ this,
+ eventNodeId(),
+ txEntry.nodeId(),
+ false,
+ false,
+ evt,
+ metrics,
+ topVer,
+ txEntry.filters(),
+ cached.detached() ? DR_NONE : drType,
+ cached.isNear() ? null : explicitVer,
+ CU.subjectId(this, cctx),
+ resolveTaskName());
+
+ if (nearCached != null && updRes.success())
+ nearCached.innerRemove(
+ null,
+ eventNodeId(),
+ nodeId,
+ false,
+ false,
+ false,
+ metrics,
+ topVer,
+ CU.<K, V>empty(),
+ DR_NONE,
+ null,
+ CU.subjectId(this, cctx),
+ resolveTaskName());
+ }
+ else if (op == RELOAD) {
+ cached.innerReload(CU.<K, V>empty());
+
+ if (nearCached != null)
+ nearCached.innerReload(CU.<K, V>empty());
+ }
+ else if (op == READ) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring READ entry when committing: " + txEntry);
+ }
+ else {
+ assert !groupLock() || txEntry.groupLockEntry() || ownsLock(txEntry.cached()):
+ "Transaction does not own lock for group lock entry during commit [tx=" +
+ this + ", txEntry=" + txEntry + ']';
+
+ if (log.isDebugEnabled())
+ log.debug("Ignoring NOOP entry when committing: " + txEntry);
+ }
+ }
+
+ // Check commit locks after set, to make sure that
+ // we are not changing obsolete entries.
+ // (innerSet and innerRemove will throw an exception
+ // if an entry is obsolete).
+ if (txEntry.op() != READ && !txEntry.groupLockEntry())
+ checkCommitLocks(cached);
+
+ // Break out of while loop.
+ break;
+ }
+ // If entry cached within transaction got removed.
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
+
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey()), txEntry.keyBytes());
+ }
+ }
+ }
+ catch (Throwable ex) {
+ // We are about to initiate transaction rollback when tx has started to committing.
+ // Need to remove version from committed list.
+ cctx.tm().removeCommittedTx(this);
+
+ if (X.hasCause(ex, GridCacheIndexUpdateException.class) && cacheCtx.cache().isMongoDataCache()) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to update mongo document index (transaction entry will " +
+ "be ignored): " + txEntry);
+
+ // Set operation to NOOP.
+ txEntry.op(NOOP);
+
+ setRollbackOnly();
+
+ throw ex;
+ }
+ else {
+ IgniteCheckedException err = new IgniteTxHeuristicException("Failed to locally write to cache " +
+ "(all transaction entries will be invalidated, however there was a window when " +
+ "entries for this transaction were visible to others): " + this, ex);
+
+ U.error(log, "Heuristic transaction failure.", err);
+
+ commitErr.compareAndSet(null, err);
+
+ state(UNKNOWN);
+
+ try {
+ // Courtesy to minimize damage.
+ uncommit();
+ }
+ catch (Throwable ex1) {
+ U.error(log, "Failed to uncommit transaction: " + this, ex1);
+ }
+
+ throw err;
+ }
+ }
+ }
+ }
+ finally {
+ cctx.tm().txContextReset();
+ }
+ }
+ else {
+ GridCacheStoreManager<K, V> store = store();
+
+ if (store != null && (!internal() || groupLock())) {
+ try {
+ store.txEnd(this, true);
+ }
+ catch (IgniteCheckedException e) {
+ commitError(e);
+
+ setRollbackOnly();
+
+ cctx.tm().removeCommittedTx(this);
+
+ throw e;
+ }
+ }
+ }
+
+ // Do not unlock transaction entries if one-phase commit.
+ if (!onePhaseCommit()) {
+ if (doneFlag.compareAndSet(false, true)) {
+ // Unlock all locks.
+ cctx.tm().commitTx(this);
+
+ boolean needsCompletedVersions = needsCompletedVersions();
+
+ assert !needsCompletedVersions || completedBase != null;
+ assert !needsCompletedVersions || committedVers != null;
+ assert !needsCompletedVersions || rolledbackVers != null;
+ }
+ }
+ }
+
+ /**
+ * Commits transaction to transaction manager. Used for one-phase commit transactions only.
+ */
+ public void tmCommit() {
+ assert onePhaseCommit();
+
+ if (doneFlag.compareAndSet(false, true)) {
+ // Unlock all locks.
+ cctx.tm().commitTx(this);
+
+ state(COMMITTED);
+
+ boolean needsCompletedVersions = needsCompletedVersions();
+
+ assert !needsCompletedVersions || completedBase != null;
+ assert !needsCompletedVersions || committedVers != null;
+ assert !needsCompletedVersions || rolledbackVers != null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void completedVersions(
+ GridCacheVersion completedBase,
+ Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers) {
+ this.completedBase = completedBase;
+ this.committedVers = committedVers;
+ this.rolledbackVers = rolledbackVers;
+ }
+
+ /**
+ * @return Completed base for ordering.
+ */
+ public GridCacheVersion completedBase() {
+ return completedBase;
+ }
+
+ /**
+ * @return Committed versions.
+ */
+ public Collection<GridCacheVersion> committedVersions() {
+ return committedVers;
+ }
+
+ /**
+ * @return Rolledback versions.
+ */
+ public Collection<GridCacheVersion> rolledbackVersions() {
+ return rolledbackVers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void userRollback() throws IgniteCheckedException {
+ IgniteTxState state = state();
+
+ if (state != ROLLING_BACK && state != ROLLED_BACK) {
+ setRollbackOnly();
+
+ throw new IgniteCheckedException("Invalid transaction state for rollback [state=" + state + ", tx=" + this + ']',
+ commitErr.get());
+ }
+
+ if (doneFlag.compareAndSet(false, true)) {
+ try {
+ if (near())
+ // Must evict near entries before rolling back from
+ // transaction manager, so they will be removed from cache.
+ for (IgniteTxEntry<K, V> e : allEntries())
+ evictNearEntry(e, false);
+
+ cctx.tm().rollbackTx(this);
+
+ GridCacheStoreManager<K, V> store = store();
+
+ if (store != null && (near() || store.writeToStoreFromDht())) {
+ if (!internal() || groupLock())
+ store.txEnd(this, false);
+ }
+ }
+ catch (Error | IgniteCheckedException | RuntimeException e) {
+ U.addLastCause(e, commitErr.get(), log);
+
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Checks if there is a cached or swapped value for
+ * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
+ *
+ *
+ * @param keys Key to enlist.
+ * @param cached Cached entry, if called from entry wrapper.
+ * @param map Return map.
+ * @param missed Map of missed keys.
+ * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
+ * @param deserializePortable Deserialize portable flag.
+ * @param filter Filter to test.
+ * @throws IgniteCheckedException If failed.
+ * @return Enlisted keys.
+ */
+ @SuppressWarnings({"RedundantTypeArguments"})
+ private Collection<K> enlistRead(
+ final GridCacheContext<K, V> cacheCtx,
+ Collection<? extends K> keys,
+ @Nullable GridCacheEntryEx<K, V> cached,
+ Map<K, V> map,
+ Map<K, GridCacheVersion> missed,
+ int keysCnt,
+ boolean deserializePortable,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+ assert !F.isEmpty(keys);
+ assert keysCnt == keys.size();
+ assert cached == null || F.first(keys).equals(cached.key());
+
+ cacheCtx.checkSecurity(GridSecurityPermission.CACHE_READ);
+
+ groupLockSanityCheck(cacheCtx, keys);
+
+ boolean single = keysCnt == 1;
+
+ Collection<K> lockKeys = null;
+
+ long topVer = topologyVersion();
+
+ // In this loop we cover only read-committed or optimistic transactions.
+ // Transactions that are pessimistic and not read-committed are covered
+ // outside of this loop.
+ for (K key : keys) {
+ if (key == null)
+ continue;
+
+ if (pessimistic() && !readCommitted())
+ addActiveCache(cacheCtx);
+
+ IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+ // Check write map (always check writes first).
+ IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+ // Either non-read-committed or there was a previous write.
+ if (txEntry != null) {
+ if (cacheCtx.isAll(txEntry.cached(), filter)) {
+ V val = txEntry.value();
+
+ // Read value from locked entry in group-lock transaction as well.
+ if (txEntry.hasValue()) {
+ if (!F.isEmpty(txEntry.transformClosures())) {
+ for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+ val = clos.apply(val);
+ }
+
+ if (val != null) {
+ V val0 = val;
+
+ if (cacheCtx.portableEnabled())
+ val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+ map.put(key, val0);
+ }
+ }
+ else {
+ assert txEntry.op() == TRANSFORM || (groupLock() && !txEntry.groupLockEntry());
+
+ while (true) {
+ try {
+ Object transformClo =
+ (txEntry.op() == TRANSFORM && cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.transformClosures()) : null;
+
+ val = txEntry.cached().innerGet(this,
+ /*swap*/true,
+ /*read-through*/false,
+ /*fail fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ filter);
+
+ if (val != null) {
+ if (!readCommitted())
+ txEntry.readValue(val);
+
+ if (!F.isEmpty(txEntry.transformClosures())) {
+ for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+ val = clos.apply(val);
+ }
+
+ V val0 = val;
+
+ if (cacheCtx.portableEnabled())
+ val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+ map.put(key, val0);
+ }
+ else
+ missed.put(key, txEntry.cached().version());
+
+ break;
+ }
+ catch (GridCacheFilterFailedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Filter validation failed for entry: " + txEntry);
+
+ if (!readCommitted())
+ txEntry.readValue(e.<V>value());
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer), txEntry.keyBytes());
+ }
+ }
+ }
+ }
+ }
+ // First time access within transaction.
+ else {
+ if (lockKeys == null)
+ lockKeys = single ? (Collection<K>)keys : new ArrayList<K>(keysCnt);
+
+ if (!single)
+ lockKeys.add(key);
+
+ while (true) {
+ GridCacheEntryEx<K, V> entry;
+
+ if (cached != null) {
+ entry = cached;
+
+ cached = null;
+ }
+ else
+ entry = entryEx(cacheCtx, txKey, topVer);
+
+ try {
+ GridCacheVersion ver = entry.version();
+
+ V val = null;
+
+ if (!pessimistic() || readCommitted() || groupLock()) {
+ // This call will check for filter.
+ val = entry.innerGet(this,
+ /*swap*/true,
+ /*no read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ filter);
+
+ if (val != null) {
+ V val0 = val;
+
+ if (cacheCtx.portableEnabled())
+ val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+ map.put(key, val0);
+ }
+ else
+ missed.put(key, ver);
+ }
+ else
+ // We must wait for the lock in pessimistic mode.
+ missed.put(key, ver);
+
+ if (!readCommitted()) {
+ txEntry = addEntry(READ, val, null, entry, -1, filter, true, -1L, -1L, null);
+
+ if (groupLock())
+ txEntry.groupLockEntry(true);
+
+ // As optimization, mark as checked immediately
+ // for non-pessimistic if value is not null.
+ if (val != null && !pessimistic())
+ txEntry.markValid();
+ }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignored) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+ }
+ catch (GridCacheFilterFailedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Filter validation failed for entry: " + entry);
+
+ if (!readCommitted()) {
+ // Value for which failure occurred.
+ V val = e.<V>value();
+
+ txEntry = addEntry(READ, val, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L, null);
+
+ // Mark as checked immediately for non-pessimistic.
+ if (val != null && !pessimistic())
+ txEntry.markValid();
+ }
+
+ break; // While loop.
+ }
+ }
+ }
+ }
+
+ return lockKeys != null ? lockKeys : Collections.<K>emptyList();
+ }
+
+ /**
+ * Adds skipped key.
+ *
+ * @param skipped Skipped set (possibly {@code null}).
+ * @param key Key to add.
+ * @return Skipped set.
+ */
+ private Set<K> skip(Set<K> skipped, K key) {
+ if (skipped == null)
+ skipped = new GridLeanSet<>();
+
+ skipped.add(key);
+
+ if (log.isDebugEnabled())
+ log.debug("Added key to skipped set: " + key);
+
+ return skipped;
+ }
+
+ /**
+ * Loads all missed keys for
+ * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, IgnitePredicate[])} method.
+ *
+ * @param map Return map.
+ * @param missedMap Missed keys.
+ * @param redos Keys to retry.
+ * @param deserializePortable Deserialize portable flag.
+ * @param filter Filter.
+ * @return Loaded key-value pairs.
+ */
+ private IgniteFuture<Map<K, V>> checkMissed(
+ final GridCacheContext<K, V> cacheCtx,
+ final Map<K, V> map,
+ final Map<K, GridCacheVersion> missedMap,
+ @Nullable final Collection<K> redos,
+ final boolean deserializePortable,
+ final IgnitePredicate<GridCacheEntry<K, V>>[] filter
+ ) {
+ assert redos != null || pessimistic();
+
+ if (log.isDebugEnabled())
+ log.debug("Loading missed values for missed map: " + missedMap);
+
+ final Collection<K> loaded = new HashSet<>();
+
+ return new GridEmbeddedFuture<>(cctx.kernalContext(),
+ loadMissing(
+ cacheCtx,
+ false, missedMap.keySet(), deserializePortable, new CI2<K, V>() {
+ /** */
+ private GridCacheVersion nextVer;
+
+ @Override public void apply(K key, V val) {
+ if (isRollbackOnly()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring loaded value for read because transaction was rolled back: " +
+ IgniteTxLocalAdapter.this);
+
+ return;
+ }
+
+ GridCacheVersion ver = missedMap.get(key);
+
+ if (ver == null) {
+ if (log.isDebugEnabled())
+ log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
+
+ return;
+ }
+
+ V visibleVal = val;
+
+ IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+ IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+ if (txEntry != null) {
+ if (!readCommitted())
+ txEntry.readValue(val);
+
+ if (!F.isEmpty(txEntry.transformClosures())) {
+ for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+ visibleVal = clos.apply(visibleVal);
+ }
+ }
+
+ // In pessimistic mode we hold the lock, so filter validation
+ // should always be valid.
+ if (pessimistic())
+ ver = null;
+
+ // Initialize next version.
+ if (nextVer == null)
+ nextVer = cctx.versions().next(topologyVersion());
+
+ while (true) {
+ assert txEntry != null || readCommitted() || groupLock();
+
+ GridCacheEntryEx<K, V> e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+
+ try {
+ boolean pass = cacheCtx.isAll(e, filter);
+
+ // Must initialize to true since even if filter didn't pass,
+ // we still record the transaction value.
+ boolean set = true;
+
+ if (pass) {
+ try {
+ set = e.versionedValue(val, ver, nextVer);
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction getAll method " +
+ "(will try again): " + e);
+
+ if (pessimistic() && !readCommitted() && !isRollbackOnly() &&
+ (!groupLock() || F.eq(e.key(), groupLockKey()))) {
+ U.error(log, "Inconsistent transaction state (entry got removed while " +
+ "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
+
+ setRollbackOnly();
+
+ return;
+ }
+
+ if (txEntry != null)
+ txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+
+ continue; // While loop.
+ }
+ }
+
+ // In pessimistic mode, we should always be able to set.
+ assert set || !pessimistic();
+
+ if (readCommitted() || groupLock()) {
+ cacheCtx.evicts().touch(e, topologyVersion());
+
+ if (pass && visibleVal != null)
+ map.put(key, visibleVal);
+ }
+ else {
+ assert txEntry != null;
+
+ if (set || F.isEmptyOrNulls(filter)) {
+ txEntry.setAndMarkValid(val);
+
+ if (pass && visibleVal != null)
+ map.put(key, visibleVal);
+ }
+ else {
+ assert !pessimistic() : "Pessimistic transaction should not have to redo gets: " +
+ this;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to set versioned value for entry (will redo): " + e);
+
+ redos.add(key);
+ }
+ }
+
+ loaded.add(key);
+
+ if (log.isDebugEnabled())
+ log.debug("Set value loaded from store into entry from transaction [set=" + set +
+ ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+
+ break; // While loop.
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+ }
+ }
+ }
+ }),
+ new C2<Boolean, Exception, Map<K, V>>() {
+ @Override public Map<K, V> apply(Boolean b, Exception e) {
+ if (e != null) {
+ setRollbackOnly();
+
+ throw new GridClosureException(e);
+ }
+
+ if (!b && !readCommitted()) {
+ // There is no store - we must mark the entries.
+ for (K key : missedMap.keySet()) {
+ IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+
+ if (txEntry != null)
+ txEntry.markValid();
+ }
+ }
+
+ if (readCommitted()) {
+ Collection<K> notFound = new HashSet<>(missedMap.keySet());
+
+ notFound.removeAll(loaded);
+
+ // In read-committed mode touch entries that have just been read.
+ for (K key : notFound) {
+ IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(key));
+
+ GridCacheEntryEx<K, V> entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
+ txEntry.cached();
+
+ if (entry != null)
+ cacheCtx.evicts().touch(entry, topologyVersion());
+ }
+ }
+
+ return map;
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Map<K, V>> getAllAsync(
+ final GridCacheContext<K, V> cacheCtx,
+ Collection<? extends K> keys,
+ @Nullable GridCacheEntryEx<K, V> cached, final boolean deserializePortable,
+ final IgnitePredicate<GridCacheEntry<K, V>>[] filter) {
+ if (F.isEmpty(keys))
+ return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
+
+ init();
+
+ int keysCnt = keys.size();
+
+ boolean single = keysCnt == 1;
+
+ try {
+ checkValid();
+
+ final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
+
+ final Map<K, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
+
+ final Collection<K> lockKeys = enlistRead(cacheCtx, keys, cached, retMap, missed, keysCnt,
+ deserializePortable, filter);
+
+ if (single && missed.isEmpty())
+ return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+
+ // Handle locks.
+ if (pessimistic() && !readCommitted() && !groupLock()) {
+ IgniteFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, lockTimeout(), this, true, true,
+ isolation, isInvalidate(), CU.<K, V>empty());
+
+ PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
+ @Override public IgniteFuture<Map<K, V>> postLock() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for read on keys: " + lockKeys);
+
+ // Load keys only after the locks have been acquired.
+ for (K key : lockKeys) {
+ if (retMap.containsKey(key))
+ // We already have a return value.
+ continue;
+
+ IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+ IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+ assert txEntry != null;
+
+ // Check if there is cached value.
+ while (true) {
+ GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+ try {
+ Object transformClo =
+ (!F.isEmpty(txEntry.transformClosures()) &&
+ cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
+ F.first(txEntry.transformClosures()) : null;
+
+ V val = cached.innerGet(IgniteTxLocalAdapter.this,
+ cacheCtx.isSwapOrOffheapEnabled(),
+ /*read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*events*/true,
+ /*temporary*/true,
+ CU.subjectId(IgniteTxLocalAdapter.this, cctx),
+ transformClo,
+ resolveTaskName(),
+ filter);
+
+ // If value is in cache and passed the filter.
+ if (val != null) {
+ missed.remove(key);
+
+ txEntry.setAndMarkValid(val);
+
+ if (!F.isEmpty(txEntry.transformClosures())) {
+ for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+ val = clos.apply(val);
+ }
+
+ if (cacheCtx.portableEnabled())
+ val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable);
+
+ retMap.put(key, val);
+ }
+
+ // Even though we bring the value back from lock acquisition,
+ // we still need to recheck primary node for consistent values
+ // in case of concurrent transactional locks.
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed exception in get postLock (will retry): " +
+ cached);
+
+ txEntry.cached(entryEx(cacheCtx, txKey), txEntry.keyBytes());
+ }
+ catch (GridCacheFilterFailedException e) {
+ // Failed value for the filter.
+ V val = e.value();
+
+ if (val != null) {
+ // If filter fails after lock is acquired, we don't reload,
+ // regardless if value is null or not.
+ missed.remove(key);
+
+ txEntry.setAndMarkValid(val);
+ }
+
+ break; // While.
+ }
+ }
+ }
+
+ if (!missed.isEmpty() && (cacheCtx.isReplicated() || cacheCtx.isLocal()))
+ return checkMissed(cacheCtx, retMap, missed, null, deserializePortable, filter);
+
+ return new GridFinishedFuture<>(cctx.kernalContext(), Collections.<K, V>emptyMap());
+ }
+ };
+
+ FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
+ @Override Map<K, V> finish(Map<K, V> loaded) {
+ retMap.putAll(loaded);
+
+ return retMap;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ IgniteFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
+
+ return fut1.isDone() ?
+ new GridFinishedFutureEx<>(finClos.apply(fut1.get(), null)) :
+ new GridEmbeddedFuture<>(cctx.kernalContext(), fut1, finClos);
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(cctx.kernalContext(), e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return plc2.apply(false, e);
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(cctx.kernalContext(), e1);
+ }
+ }
+ }
+ else {
+ return new GridEmbeddedFuture<>(
+ cctx.kernalContext(),
+ fut,
+ plc2,
+ finClos);
+ }
+ }
+ else {
+ assert optimistic() || readCommitted() || groupLock();
+
+ final Collection<K> redos = new LinkedList<>();
+
+ if (!missed.isEmpty()) {
+ if (!readCommitted())
+ for (Iterator<K> it = missed.keySet().iterator(); it.hasNext(); )
+ if (retMap.containsKey(it.next()))
+ it.remove();
+
+ if (missed.isEmpty())
+ return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+
+ return new GridEmbeddedFuture<>(
+ cctx.kernalContext(),
+ // First future.
+ checkMissed(cacheCtx, retMap, missed, redos, deserializePortable, filter),
+ // Closure that returns another future, based on result from first.
+ new PMC<Map<K, V>>() {
+ @Override public IgniteFuture<Map<K, V>> postMiss(Map<K, V> map) {
+ if (redos.isEmpty())
+ return new GridFinishedFuture<>(cctx.kernalContext(),
+ Collections.<K, V>emptyMap());
+
+ if (log.isDebugEnabled())
+ log.debug("Starting to future-recursively get values for keys: " + redos);
+
+ // Future recursion.
+ return getAllAsync(cacheCtx, redos, null, deserializePortable, filter);
+ }
+ },
+ // Finalize.
+ new FinishClosure<Map<K, V>>() {
+ @Override Map<K, V> finish(Map<K, V> loaded) {
+ for (Map.Entry<K, V> entry : loaded.entrySet()) {
+ IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(entry.getKey()));
+
+ V val = entry.getValue();
+
+ if (!readCommitted())
+ txEntry.readValue(val);
+
+ if (!F.isEmpty(txEntry.transformClosures())) {
+ for (IgniteClosure<V, V> clos : txEntry.transformClosures())
+ val = clos.apply(val);
+ }
+
+ retMap.put(entry.getKey(), val);
+ }
+
+ return retMap;
+ }
+ }
+ );
+ }
+
+ return new GridFinishedFuture<>(cctx.kernalContext(), retMap);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ setRollbackOnly();
+
+ return new GridFinishedFuture<>(cctx.kernalContext(), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<GridCacheReturn<V>> putAllAsync(
+ GridCacheContext<K, V> cacheCtx,
+ Map<? extends K, ? extends V> map,
+ boolean retval,
+ @Nullable GridCacheEntryEx<K, V> cached,
+ long ttl,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter
+ ) {
+ return putAllAsync0(cacheCtx, map, null, null, retval, cached, ttl, filter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> putAllDrAsync(
+ GridCacheContext<K, V> cacheCtx,
+ Map<? extends K, GridCacheDrInfo<V>> drMap
+ ) {
+ return putAllAsync0(cacheCtx, null, null, drMap, false, null, -1, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<GridCacheReturn<V>> transformAllAsync(
+ GridCacheContext<K, V> cacheCtx,
+ @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> map,
+ boolean retval,
+ @Nullable GridCacheEntryEx<K, V> cached,
+ long ttl
+ ) {
+ return putAllAsync0(cacheCtx, null, map, null, retval, null, -1, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> removeAllDrAsync(
+ GridCacheContext<K, V> cacheCtx,
+ Map<? extends K, GridCacheVersion> drMap
+ ) {
+ return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
+ }
+
+ /**
+ * Checks filter for non-pessimistic transactions.
+ *
+ * @param cached Cached entry.
+ * @param filter Filter to check.
+ * @return {@code True} if passed or pessimistic.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean filter(GridCacheEntryEx<K, V> cached,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+ return pessimistic() || cached.context().isAll(cached, filter);
+ }
+
+ /**
+ * Internal routine for <tt>putAll(..)</tt>
+ *
+ * @param keys Keys to enlist.
+ * @param cached Cached entry.
+ * @param ttl Time to live for entry. If negative, leave unchanged.
+ * @param implicit Implicit flag.
+ * @param lookup Value lookup map ({@code null} for remove).
+ * @param transformMap Map with transform closures if this is a transform operation.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param enlisted Collection of keys enlisted into this transaction.
+ * @param drPutMap DR put map (optional).
+ * @param drRmvMap DR remove map (optional).
+ * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
+ */
+ protected IgniteFuture<Set<K>> enlistWrite(
+ GridCacheContext<K, V> cacheCtx,
+ Collection<? extends K> keys,
+ @Nullable GridCacheEntryEx<K, V> cached,
+ long ttl,
+ boolean implicit,
+ @Nullable Map<? extends K, ? extends V> lookup,
+ @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+ boolean retval,
+ boolean lockOnly,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter,
+ final GridCacheReturn<V> ret,
+ Collection<K> enlisted,
+ @Nullable Map<? extends K, GridCacheDrInfo<V>> drPutMap,
+ @Nullable Map<? extends K, GridCacheVersion> drRmvMap
+ ) {
+ assert cached == null || keys.size() == 1;
+ assert cached == null || F.first(keys).equals(cached.key());
+
+ try {
+ addActiveCache(cacheCtx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(cctx.kernalContext(), e);
+ }
+
+ Set<K> skipped = null;
+
+ boolean rmv = lookup == null && transformMap == null;
+
+ try {
+ // Set transform flag for transaction.
+ if (transformMap != null)
+ transform = true;
+
+ groupLockSanityCheck(cacheCtx, keys);
+
+ for (K key : keys) {
+ V val = rmv || lookup == null ? null : lookup.get(key);
+ IgniteClosure<V, V> transformClo = transformMap == null ? null : transformMap.get(key);
+
+ GridCacheVersion drVer;
+ long drTtl;
+ long drExpireTime;
+
+ if (drPutMap != null) {
+ GridCacheDrInfo<V> info = drPutMap.get(key);
+
+ assert info != null;
+
+ drVer = info.version();
+ drTtl = info.ttl();
+ drExpireTime = info.expireTime();
+ }
+ else if (drRmvMap != null) {
+ assert drRmvMap.get(key) != null;
+
+ drVer = drRmvMap.get(key);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+ else {
+ drVer = null;
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
+
+ if (key == null)
+ continue;
+
+ if (!rmv && val == null && transformClo == null) {
+ skipped = skip(skipped, key);
+
+ continue;
+ }
+
+ if (cacheCtx.portableEnabled())
+ key = (K)cacheCtx.marshalToPortable(key);
+
+ IgniteTxKey<K> txKey = cacheCtx.txKey(key);
+
+ IgniteTxEntry<K, V> txEntry = entry(txKey);
+
+ // First time access.
+ if (txEntry == null) {
+ while (true) {
+ GridCacheEntryEx<K, V> entry;
+
+ if (cached != null) {
+ entry = cached;
+
+ cached = null;
+ }
+ else {
+ entry = entryEx(cacheCtx, txKey, topologyVersion());
+
+ entry.unswap(true, false);
+ }
+
+ try {
+ // Check if lock is being explicitly acquired by the same thread.
+ if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+ entry.lockedByThread(threadId, xidVer))
+ throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+ "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer +
+ ", threadId=" + threadId +
+ ", locNodeId=" + cctx.localNodeId() + ']');
+
+ V old = null;
+
+ boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+
+ if (optimistic()) {
+ try {
+ //Should read through if filter is specified.
+ old = entry.innerGet(this,
+ /*swap*/false,
+ /*read-through*/readThrough,
+ /*fail-fast*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ transformClo,
+ resolveTaskName(),
+ CU.<K, V>empty());
+ }
+ catch (GridCacheFilterFailedException e) {
+ e.printStackTrace();
+
+ assert false : "Empty filter failed: " + e;
+ }
+ }
+ else
+ old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+
+ if (!filter(entry, filter)) {
+ skipped = skip(skipped, key);
+
+ ret.set(old, false);
+
+ if (!readCommitted() && old != null) {
+ // Enlist failed filters as reads for non-read-committed mode,
+ // so future ops will get the same values.
+ txEntry = addEntry(READ, old, null, entry, -1, CU.<K, V>empty(), false, -1L, -1L,
+ null);
+
+ txEntry.markValid();
+ }
+
+ if (readCommitted() || old == null)
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ break; // While.
+ }
+
+ txEntry = addEntry(lockOnly ? NOOP : rmv ? DELETE : transformClo != null ? TRANSFORM :
+ old != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+ drExpireTime, drVer);
+
+ if (!implicit() && readCommitted())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ if (groupLock() && !lockOnly)
+ txEntry.groupLockEntry(true);
+
+ enlisted.add(key);
+
+ if (!pessimistic() || (groupLock() && !lockOnly)) {
+ txEntry.markValid();
+
+ if (old == null) {
+ if (retval && !readThrough) {
+ // If return value is required, then we know for sure that there is only
+ // one key in the keys collection.
+ assert keys.size() == 1;
+
+ IgniteFuture<Boolean> fut = loadMissing(
+ cacheCtx,
+ true,
+ F.asList(key),
+ deserializePortables(cacheCtx),
+ new CI2<K, V>() {
+ @Override public void apply(K k, V v) {
+ if (log.isDebugEnabled())
+ log.debug("Loaded value from remote node [key=" + k + ", val=" +
+ v + ']');
+
+ ret.set(v, true);
+ }
+ });
+
+ return new GridEmbeddedFuture<>(
+ cctx.kernalContext(),
+ fut,
+ new C2<Boolean, Exception, Set<K>>() {
+ @Override public Set<K> apply(Boolean b, Exception e) {
+ if (e != null)
+ throw new GridClosureException(e);
+
+ return Collections.emptySet();
+ }
+ }
+ );
+ }
+ else
+ ret.set(null, true);
+ }
+ else
+ ret.set(old, true);
+ }
+ // Pessimistic.
+ else
+ ret.set(old, true);
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction putAll0 method: " + entry);
+ }
+ }
+ }
+ else {
+ if (transformClo == null && txEntry.op() == TRANSFORM)
+ throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+ "transaction after transform closure is applied): " + key);
+
+ GridCacheEntryEx<K, V> entry = txEntry.cached();
+
+ V v = txEntry.value();
+
+ boolean del = txEntry.op() == DELETE && rmv;
+
+ if (!del) {
+ if (!filter(entry, filter)) {
+ skipped = skip(skipped, key);
+
+ ret.set(v, false);
+
+ continue;
+ }
+
+ txEntry = addEntry(rmv ? DELETE : transformClo != null ? TRANSFORM :
+ v != null ? UPDATE : CREATE, val, transformClo, entry, ttl, filter, true, drTtl,
+ drExpireTime, drVer);
+
+ enlisted.add(key);
+ }
+
+ if (!pessimistic()) {
+ txEntry.markValid();
+
+ // Set tx entry and return values.
+ ret.set(v, true);
+ }
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(cctx.kernalContext(), e);
+ }
+
+ return new GridFinishedFuture<>(cctx.kernalContext(), skipped);
+ }
+
+ /**
+ * Post lock processing for put or remove.
+ *
+ * @param keys Keys.
+ * @param failed Collection of potentially failed keys (need to populate in this method).
+ * @param transformed Output map where transformed values will be placed.
+ * @param transformMap Transform map.
+ * @param ret Return value.
+ * @param rmv {@code True} if remove.
+ * @param retval Flag to return value or not.
+ * @param filter Filter to check entries.
+ * @return Failed keys.
+ * @throws IgniteCheckedException If error.
+ */
+ protected Set<K> postLockWrite(
+ GridCacheContext<K, V> cacheCtx,
+ Iterable<? extends K> keys,
+ Set<K> failed,
+ @Nullable Map<K, V> transformed,
+ @Nullable Map<? extends K, ? extends IgniteClosure<V, V>> transformMap,
+ GridCacheReturn<V> ret,
+ boolean rmv,
+ boolean retval,
+ IgnitePredicate<GridCacheEntry<K, V>>[] filter
+ ) throws IgniteCheckedException {
+ for (K k : keys) {
+ IgniteTxEntry<K, V> txEntry = entry(cacheCtx.txKey(k));
+
+ if (txEntry == null)
+ throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
+ "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
+
+ while (true) {
+ GridCacheEntryEx<K, V> cached = txEntry.cached();
+
+ try {
+ assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
+ "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
+ ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
+
+ if (log.isDebugEnabled())
+ log.debug("Post lock write entry: " + cached);
+
+ V v = txEntry.previousValue();
+ boolean hasPrevVal = txEntry.hasPreviousValue();
+
+ if (onePhaseCommit())
+ filter = txEntry.filters();
+
+ // If we have user-passed filter, we must read value into entry for peek().
+ if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
+ retval = true;
+
+ if (retval) {
+ if (!cacheCtx.isNear()) {
+ try {
+ if (!hasPrevVal)
+ v = cached.innerGet(this,
+ /*swap*/retval,
+ /*read-through*/retval,
+ /*failFast*/false,
+ /*unmarshal*/retval,
+ /*metrics*/true,
+ /*event*/!dht(),
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ CU.<K, V>empty());
+ }
+ catch (GridCacheFilterFailedException e) {
+ e.printStackTrace();
+
+ assert false : "Empty filter failed: " + e;
+ }
+ }
+ else {
+ if (!hasPrevVal)
+ v = retval ? cached.rawGetOrUnmarshal(false) : cached.rawGet();
+ }
+
+ ret.value(v);
+ }
+
+ boolean pass = cacheCtx.isAll(cached, filter);
+
+ // For remove operation we return true only if we are removing s/t,
+ // i.e. cached value is not null.
+ ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
+
+ if (onePhaseCommit())
+ txEntry.filtersPassed(pass);
+
+ if (pass) {
+ txEntry.markValid();
+
+ if (log.isDebugEnabled())
+ log.debug("Filter passed in post lock for key: " + k);
+ }
+ else {
+ failed = skip(failed, k);
+
+ // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
+ txEntry.setAndMarkValid(txEntry.previousOperation(), ret.value());
+ txEntry.filters(CU.<K, V>empty());
+ txEntry.filtersSet(false);
+ }
+
+ break; // While.
+ }
+ // If entry cached within transaction got removed before lock.
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
+
+ txEntry.cached(entryEx(cached.context(), txEntry.txKey()), txEntry.keyBytes());
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Entries that failed after lock filter check: " + failed);
+
+ return failed;
+ }
+
+ /**
+ * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
+ * maps must be non-null.
+ *
+ * @param map Key-value map to store.
+ * @param transformMap Transform map.
+ * @param drMap DR map.
+ * @param retval Key-transform value map to store.
+ * @param cached Cached entry, if any.
+ * @param ttl Time to live.
+ * @param filter Filter.
+ * @return Operation future.
+ */
+ private IgniteFuture<GridCacheReturn<V>> putAllAsync0(
+ final GridCacheContext<K, V> cacheCtx,
+ @Nullable Map<? extends K, ? extends V> ma
<TRUNCATED>