You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2017/03/25 00:12:43 UTC
[03/56] [abbrv] ignite git commit: Internal cache API cleanup.
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b1a4003..dc4e52f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -18,33 +18,24 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.io.Externalizable;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import javax.cache.Cache;
-import javax.cache.CacheException;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -58,9 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -68,36 +56,26 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.C2;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
@@ -105,8 +83,6 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
-import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
-import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -164,6 +140,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@GridToStringInclude
protected IgniteTxLocalState txState;
+ /** */
+ protected CacheWriteSynchronizationMode syncMode;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -224,6 +203,23 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl();
}
+ /**
+ * @return Transaction write synchronization mode.
+ */
+ public final CacheWriteSynchronizationMode syncMode() {
+ if (syncMode != null)
+ return syncMode;
+
+ return txState().syncMode(cctx);
+ }
+
+ /**
+ * @param syncMode Write synchronization mode.
+ */
+ public void syncMode(CacheWriteSynchronizationMode syncMode) {
+ this.syncMode = syncMode;
+ }
+
/** {@inheritDoc} */
@Override public IgniteTxState txState() {
return txState;
@@ -391,142 +387,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return null;
}
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> loadMissing(
- final GridCacheContext cacheCtx,
- final AffinityTopologyVersion topVer,
- final boolean readThrough,
- boolean async,
- final Collection<KeyCacheObject> keys,
- boolean skipVals,
- boolean needVer,
- boolean keepBinary,
- final ExpiryPolicy expiryPlc,
- final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
- ) {
- assert cacheCtx.isLocal() : cacheCtx.name();
-
- if (!readThrough || !cacheCtx.readThrough()) {
- for (KeyCacheObject key : keys)
- c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
-
- return new GridFinishedFuture<>();
- }
-
- try {
- IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
- accessPolicy(cacheCtx, keys) :
- cacheCtx.cache().expiryPolicy(expiryPlc);
-
- Map<KeyCacheObject, GridCacheVersion> misses = null;
-
- for (KeyCacheObject key : keys) {
- while (true) {
- IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
- GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) :
- txEntry.cached();
-
- if (entry == null)
- continue;
-
- try {
- EntryGetResult res = entry.innerGetVersioned(
- null,
- this,
- /*readSwap*/true,
- /*unmarshal*/true,
- /*update-metrics*/!skipVals,
- /*event*/!skipVals,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- expiryPlc0,
- txEntry == null ? keepBinary : txEntry.keepBinary(),
- null);
-
- if (res == null) {
- if (misses == null)
- misses = new LinkedHashMap<>();
-
- misses.put(key, entry.version());
- }
- else
- c.apply(key, skipVals ? true : res.value(), res.version());
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry, will retry: " + key);
-
- if (txEntry != null)
- txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion()));
- }
- }
- }
-
- if (misses != null) {
- final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
-
- cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
- @Override public void apply(KeyCacheObject key, Object val) {
- GridCacheVersion ver = misses0.remove(key);
-
- assert ver != null : key;
-
- if (val != null) {
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
- while (true) {
- GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
-
- try {
- EntryGetResult verVal = entry.versionedValue(cacheVal,
- ver,
- null,
- null,
- null);
-
- if (log.isDebugEnabled()) {
- log.debug("Set value loaded from store into entry [" +
- "oldVer=" + ver +
- ", newVer=" + verVal.version() +
- ", entry=" + entry + ']');
- }
-
- ver = verVal.version();
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry, (will retry): " + entry);
- }
- catch (IgniteCheckedException e) {
- // Wrap errors (will be unwrapped).
- throw new GridClosureException(e);
- }
- }
- }
- else
- ver = SER_READ_EMPTY_ENTRY_VER;
-
- c.apply(key, val, ver);
- }
- });
-
- for (KeyCacheObject key : misses0.keySet())
- c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
- }
-
- return new GridFinishedFuture<>();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
/**
* Gets minimum version present in transaction.
*
@@ -571,21 +431,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
}
- /** {@inheritDoc} */
- @Override public void commit() throws IgniteCheckedException {
- try {
- commitAsync().get();
- }
- finally {
- cctx.tm().resetContext();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void prepare() throws IgniteCheckedException {
- prepareAsync().get();
- }
-
/**
* Checks that locks are in proper state for commit.
*
@@ -1103,2484 +948,226 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * @param entry Entry.
- * @return {@code True} if local node is current primary for given entry.
+ * @param ctx Cache context.
+ * @param key Key.
+ * @param expiryPlc Expiry policy.
+ * @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
*/
- private boolean primaryLocal(GridCacheEntryEx entry) {
- return entry.context().affinity().primaryByPartition(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE);
+ protected IgniteCacheExpiryPolicy accessPolicy(
+ GridCacheContext ctx,
+ IgniteTxKey key,
+ @Nullable ExpiryPolicy expiryPlc
+ ) {
+ return null;
}
/**
* @param cacheCtx Cache context.
- * @param keys Key to enlist.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param map Return map.
- * @param missed Map of missed keys.
- * @param keysCnt Keys count (to avoid call to {@code Collection.size()}).
- * @param deserializeBinary Deserialize binary flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects flag.
- * @param skipStore Skip store flag.
- * @throws IgniteCheckedException If failed.
- * @return Enlisted keys.
+ * @param keys Keys.
+ * @return Expiry policy.
*/
- @SuppressWarnings({"RedundantTypeArguments"})
- private <K, V> Collection<KeyCacheObject> enlistRead(
- final GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
- @Nullable ExpiryPolicy expiryPlc,
- Map<K, V> map,
- Map<KeyCacheObject, GridCacheVersion> missed,
- int keysCnt,
- boolean deserializeBinary,
- boolean skipVals,
- boolean keepCacheObjects,
- boolean skipStore,
- final boolean needVer
- ) throws IgniteCheckedException {
- assert !F.isEmpty(keys);
- assert keysCnt == keys.size();
-
- cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
-
- boolean single = keysCnt == 1;
-
- Collection<KeyCacheObject> lockKeys = null;
-
- AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
-
- boolean needReadVer = (serializable() && optimistic()) || needVer;
-
- // In this loop we cover only read-committed or optimistic transactions.
- // Transactions that are pessimistic and not read-committed are covered
- // outside of this loop.
- for (KeyCacheObject key : keys) {
- if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
- addActiveCache(cacheCtx);
-
- IgniteTxKey txKey = cacheCtx.txKey(key);
-
- // Check write map (always check writes first).
- IgniteTxEntry txEntry = entry(txKey);
-
- // Either non-read-committed or there was a previous write.
- if (txEntry != null) {
- CacheObject val = txEntry.value();
-
- if (txEntry.hasValue()) {
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- if (val != null) {
- GridCacheVersion ver = null;
-
- if (needVer) {
- if (txEntry.op() != READ)
- ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
- else {
- ver = txEntry.entryReadVersion();
+ protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+ return null;
+ }
- if (ver == null && pessimistic()) {
- while (true) {
- try {
- GridCacheEntryEx cached = txEntry.cached();
+ /**
+ * Post lock processing for put or remove.
+ *
+ * @param cacheCtx Context.
+ * @param keys Keys.
+ * @param ret Return value.
+ * @param rmv {@code True} if remove.
+ * @param retval Flag to return value or not.
+ * @param read {@code True} if read.
+ * @param accessTtl TTL for read operation.
+ * @param filter Filter to check entries.
+ * @throws IgniteCheckedException If error.
+ * @param computeInvoke If {@code true} computes return value for invoke operation.
+ */
+ @SuppressWarnings("unchecked")
+ protected final void postLockWrite(
+ GridCacheContext cacheCtx,
+ Iterable<KeyCacheObject> keys,
+ GridCacheReturn ret,
+ boolean rmv,
+ boolean retval,
+ boolean read,
+ long accessTtl,
+ CacheEntryPredicate[] filter,
+ boolean computeInvoke
+ ) throws IgniteCheckedException {
+ for (KeyCacheObject k : keys) {
+ IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
- ver = cached.isNear() ?
- ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
+ if (txEntry == null)
+ throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
+ "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
- break;
- }
- catch (GridCacheEntryRemovedException ignored) {
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
- }
- }
- }
+ while (true) {
+ GridCacheEntryEx cached = txEntry.cached();
- if (ver == null) {
- assert optimistic() && repeatableRead() : this;
+ try {
+ assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
+ "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
+ ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
- ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
- }
- }
+ if (log.isDebugEnabled())
+ log.debug("Post lock write entry: " + cached);
- assert ver != null;
- }
+ CacheObject v = txEntry.previousValue();
+ boolean hasPrevVal = txEntry.hasPreviousValue();
- cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false,
- ver, 0, 0);
- }
- }
- else {
- assert txEntry.op() == TRANSFORM;
+ if (onePhaseCommit())
+ filter = txEntry.filters();
- while (true) {
- try {
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
+ // If we have user-passed filter, we must read value into entry for peek().
+ if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
+ retval = true;
- Object transformClo =
- (txEntry.op() == TRANSFORM &&
- cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
- F.first(txEntry.entryProcessors()) : null;
+ boolean invoke = txEntry.op() == TRANSFORM;
- if (needVer) {
- getRes = txEntry.cached().innerGetVersioned(
- null,
- this,
- /*swap*/true,
- /*unmarshal*/true,
- /*update-metrics*/true,
- /*event*/!skipVals,
- CU.subjectId(this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary(),
- null);
+ if (retval || invoke) {
+ if (!cacheCtx.isNear()) {
+ if (!hasPrevVal) {
+ // For non-local cache should read from store after lock on primary.
+ boolean readThrough = cacheCtx.isLocal() &&
+ (invoke || cacheCtx.loadPreviousValue()) &&
+ !txEntry.skipStore();
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
- }
- }
- else {
- val = txEntry.cached().innerGet(
+ v = cached.innerGet(
null,
this,
/*swap*/true,
- /*read-through*/false,
- /*metrics*/true,
- /*event*/!skipVals,
+ readThrough,
+ /*metrics*/!invoke,
+ /*event*/!invoke && !dht(),
/*temporary*/false,
CU.subjectId(this, cctx),
- transformClo,
+ null,
resolveTaskName(),
null,
txEntry.keepBinary());
}
-
- if (val != null) {
- if (!readCommitted() && !skipVals)
- txEntry.readValue(val);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- cacheCtx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
- }
- else
- missed.put(key, txEntry.cached().version());
-
- break;
}
- catch (GridCacheEntryRemovedException ignored) {
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ else {
+ if (!hasPrevVal)
+ v = cached.rawGetOrUnmarshal(false);
}
- }
- }
- }
- // First time access within transaction.
- else {
- if (lockKeys == null && !skipVals)
- lockKeys = single ? Collections.singleton(key) : new ArrayList<KeyCacheObject>(keysCnt);
- if (!single && !skipVals)
- lockKeys.add(key);
+ if (txEntry.op() == TRANSFORM) {
+ if (computeInvoke) {
+ GridCacheVersion ver;
+
+ try {
+ ver = cached.version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
- while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
- try {
- GridCacheVersion ver = entry.version();
-
- CacheObject val = null;
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
-
- if (!pessimistic() || readCommitted() && !skipVals) {
- IgniteCacheExpiryPolicy accessPlc =
- optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
-
- if (needReadVer) {
- getRes = primaryLocal(entry) ?
- entry.innerGetVersioned(
- null,
- this,
- /*swap*/true,
- /*unmarshal*/true,
- /*metrics*/true,
- /*event*/true,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc,
- !deserializeBinary,
- null) : null;
-
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
+ ver = null;
}
- }
- else {
- val = entry.innerGet(
- null,
- this,
- /*swap*/true,
- /*read-through*/false,
- /*metrics*/true,
- /*event*/true,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc,
- !deserializeBinary);
- }
- if (val != null) {
- cacheCtx.addResult(map,
- key,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
+ addInvokeResult(txEntry, v, ret, ver);
}
- else
- missed.put(key, ver);
}
else
- // We must wait for the lock in pessimistic mode.
- missed.put(key, ver);
-
- if (!readCommitted() && !skipVals) {
- txEntry = addEntry(READ,
- val,
- null,
- null,
- entry,
- expiryPlc,
- null,
- true,
- -1L,
- -1L,
- null,
- skipStore,
- !deserializeBinary);
-
- // As optimization, mark as checked immediately
- // for non-pessimistic if value is not null.
- if (val != null && !pessimistic()) {
- txEntry.markValid();
-
- if (needReadVer) {
- assert readVer != null;
-
- txEntry.entryReadVersion(readVer);
- }
- }
- }
-
- break; // While.
+ ret.value(cacheCtx, v, txEntry.keepBinary());
}
- catch (GridCacheEntryRemovedException ignored) {
+
+ boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
+
+ // For remove operation we return true only if we are removing s/t,
+ // i.e. cached value is not null.
+ ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
+
+ if (onePhaseCommit())
+ txEntry.filtersPassed(pass);
+
+ boolean updateTtl = read;
+
+ if (pass) {
+ txEntry.markValid();
+
if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
+ log.debug("Filter passed in post lock for key: " + k);
}
- finally {
- if (entry != null && readCommitted()) {
- if (cacheCtx.isNear()) {
- if (cacheCtx.affinity().partitionBelongs(cacheCtx.localNode(), entry.partition(), topVer)) {
- if (entry.markObsolete(xidVer))
- cacheCtx.cache().removeEntry(entry);
- }
- }
- else
- entry.context().evicts().touch(entry, topVer);
- }
+ else {
+ // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
+ txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
+ txEntry.filters(CU.empty0());
+ txEntry.filtersSet(false);
+
+ updateTtl = !cacheCtx.putIfAbsentFilter(filter);
}
- }
- }
- }
- return lockKeys != null ? lockKeys : Collections.<KeyCacheObject>emptyList();
- }
+ if (updateTtl) {
+ if (!read) {
+ ExpiryPolicy expiryPlc = cacheCtx.expiryForTxEntry(txEntry);
- /**
- * @param ctx Cache context.
- * @param key Key.
- * @param expiryPlc Expiry policy.
- * @return Expiry policy wrapper for entries accessed locally in optimistic transaction.
- */
- protected IgniteCacheExpiryPolicy accessPolicy(
- GridCacheContext ctx,
- IgniteTxKey key,
- @Nullable ExpiryPolicy expiryPlc
- ) {
- return null;
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param keys Keys.
- * @return Expiry policy.
- */
- protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
- return null;
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param topVer Topology version.
- * @param map Return map.
- * @param missedMap Missed keys.
- * @param deserializeBinary Deserialize binary flag.
- * @param skipVals Skip values flag.
- * @param keepCacheObjects Keep cache objects flag.
- * @param skipStore Skip store flag.
- * @param expiryPlc Expiry policy.
- * @return Loaded key-value pairs.
- */
- private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
- final GridCacheContext cacheCtx,
- final AffinityTopologyVersion topVer,
- final Map<K, V> map,
- final Map<KeyCacheObject, GridCacheVersion> missedMap,
- final boolean deserializeBinary,
- final boolean skipVals,
- final boolean keepCacheObjects,
- final boolean skipStore,
- final boolean needVer,
- final ExpiryPolicy expiryPlc
-
- ) {
- if (log.isDebugEnabled())
- log.debug("Loading missed values for missed map: " + missedMap);
-
- final boolean needReadVer = (serializable() && optimistic()) || needVer;
-
- return new GridEmbeddedFuture<>(
- new C2<Void, Exception, Map<K, V>>() {
- @Override public Map<K, V> apply(Void v, Exception e) {
- if (e != null) {
- setRollbackOnly();
-
- throw new GridClosureException(e);
- }
-
- return map;
- }
- },
- loadMissing(
- cacheCtx,
- topVer,
- !skipStore,
- false,
- missedMap.keySet(),
- skipVals,
- needReadVer,
- !deserializeBinary,
- expiryPlc,
- new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
- @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
- if (isRollbackOnly()) {
- if (log.isDebugEnabled())
- log.debug("Ignoring loaded value for read because transaction was rolled back: " +
- IgniteTxLocalAdapter.this);
-
- return;
- }
-
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
- CacheObject visibleVal = cacheVal;
-
- IgniteTxKey txKey = cacheCtx.txKey(key);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- if (txEntry != null) {
- if (!readCommitted())
- txEntry.readValue(cacheVal);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- visibleVal = txEntry.applyEntryProcessors(visibleVal);
- }
-
- assert txEntry != null || readCommitted() || skipVals;
-
- GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached();
-
- if (readCommitted() || skipVals) {
- cacheCtx.evicts().touch(e, topologyVersion());
-
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- needVer ? loadVer : null,
- 0,
- 0);
- }
- }
- else {
- assert txEntry != null;
-
- txEntry.setAndMarkValid(cacheVal);
-
- if (needReadVer) {
- assert loadVer != null;
-
- txEntry.entryReadVersion(loadVer);
- }
-
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- needVer ? loadVer : null,
- 0,
- 0);
- }
- }
- }
- })
- );
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
- final GridCacheContext cacheCtx,
- @Nullable final AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
- final boolean deserializeBinary,
- final boolean skipVals,
- final boolean keepCacheObjects,
- final boolean skipStore,
- final boolean needVer) {
- if (F.isEmpty(keys))
- return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
-
- init();
-
- int keysCnt = keys.size();
-
- boolean single = keysCnt == 1;
-
- try {
- checkValid();
-
- final Map<K, V> retMap = new GridLeanMap<>(keysCnt);
-
- final Map<KeyCacheObject, GridCacheVersion> missed = new GridLeanMap<>(pessimistic() ? keysCnt : 0);
-
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
- ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
-
- final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
- entryTopVer,
- keys,
- expiryPlc,
- retMap,
- missed,
- keysCnt,
- deserializeBinary,
- skipVals,
- keepCacheObjects,
- skipStore,
- needVer);
-
- if (single && missed.isEmpty())
- return new GridFinishedFuture<>(retMap);
-
- // Handle locks.
- if (pessimistic() && !readCommitted() && !skipVals) {
- if (expiryPlc == null)
- expiryPlc = cacheCtx.expiry();
-
- long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED;
- long createTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForCreation()) : CU.TTL_NOT_CHANGED;
-
- long timeout = remainingTime();
-
- if (timeout == -1)
- return new GridFinishedFuture<>(timeoutException());
-
- IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys,
- timeout,
- this,
- true,
- true,
- isolation,
- isInvalidate(),
- createTtl,
- accessTtl);
-
- final ExpiryPolicy expiryPlc0 = expiryPlc;
-
- PLC2<Map<K, V>> plc2 = new PLC2<Map<K, V>>() {
- @Override public IgniteInternalFuture<Map<K, V>> postLock() throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Acquired transaction lock for read on keys: " + lockKeys);
-
- // Load keys only after the locks have been acquired.
- for (KeyCacheObject cacheKey : lockKeys) {
- K keyVal = (K)
- (keepCacheObjects ? cacheKey :
- cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
-
- if (retMap.containsKey(keyVal))
- // We already have a return value.
- continue;
-
- IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- assert txEntry != null;
-
- // Check if there is cached value.
- while (true) {
- GridCacheEntryEx cached = txEntry.cached();
-
- CacheObject val = null;
- GridCacheVersion readVer = null;
- EntryGetResult getRes = null;
-
- try {
- Object transformClo =
- (!F.isEmpty(txEntry.entryProcessors()) &&
- cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ)) ?
- F.first(txEntry.entryProcessors()) : null;
-
- if (needVer) {
- getRes = cached.innerGetVersioned(
- null,
- IgniteTxLocalAdapter.this,
- /*swap*/cacheCtx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
- /*update-metrics*/true,
- /*event*/!skipVals,
- CU.subjectId(IgniteTxLocalAdapter.this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary(),
- null);
-
- if (getRes != null) {
- val = getRes.value();
- readVer = getRes.version();
- }
- }
- else{
- val = cached.innerGet(
- null,
- IgniteTxLocalAdapter.this,
- cacheCtx.isSwapOrOffheapEnabled(),
- /*read-through*/false,
- /*metrics*/true,
- /*events*/!skipVals,
- /*temporary*/false,
- CU.subjectId(IgniteTxLocalAdapter.this, cctx),
- transformClo,
- resolveTaskName(),
- null,
- txEntry.keepBinary());
- }
-
- // If value is in cache and passed the filter.
- if (val != null) {
- missed.remove(cacheKey);
-
- txEntry.setAndMarkValid(val);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- cacheCtx.addResult(retMap,
- cacheKey,
- val,
- skipVals,
- keepCacheObjects,
- deserializeBinary,
- false,
- getRes,
- readVer,
- 0,
- 0,
- needVer);
-
- if (readVer != null)
- txEntry.entryReadVersion(readVer);
- }
-
- // Even though we bring the value back from lock acquisition,
- // we still need to recheck primary node for consistent values
- // in case of concurrent transactional locks.
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed exception in get postLock (will retry): " +
- cached);
-
- txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
- }
- }
- }
-
- if (!missed.isEmpty() && cacheCtx.isLocal()) {
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return checkMissed(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- retMap,
- missed,
- deserializeBinary,
- skipVals,
- keepCacheObjects,
- skipStore,
- needVer,
- expiryPlc0);
- }
-
- return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
- }
- };
-
- FinishClosure<Map<K, V>> finClos = new FinishClosure<Map<K, V>>() {
- @Override Map<K, V> finish(Map<K, V> loaded) {
- retMap.putAll(loaded);
-
- return retMap;
- }
- };
-
- if (fut.isDone()) {
- try {
- IgniteInternalFuture<Map<K, V>> fut1 = plc2.apply(fut.get(), null);
-
- return fut1.isDone() ?
- new GridFinishedFuture<>(finClos.apply(fut1.get(), null)) :
- new GridEmbeddedFuture<>(finClos, fut1);
- }
- catch (GridClosureException e) {
- return new GridFinishedFuture<>(e.unwrap());
- }
- catch (IgniteCheckedException e) {
- try {
- return plc2.apply(false, e);
- }
- catch (Exception e1) {
- return new GridFinishedFuture<>(e1);
- }
- }
- }
- else {
- return new GridEmbeddedFuture<>(
- fut,
- plc2,
- finClos);
- }
- }
- else {
- assert optimistic() || readCommitted() || skipVals;
-
- if (!missed.isEmpty()) {
- if (!readCommitted())
- for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
- KeyCacheObject cacheKey = it.next();
-
- K keyVal =
- (K)(keepCacheObjects ? cacheKey : cacheKey.value(cacheCtx.cacheObjectContext(), false));
-
- if (retMap.containsKey(keyVal))
- it.remove();
- }
-
- if (missed.isEmpty())
- return new GridFinishedFuture<>(retMap);
-
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return checkMissed(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- retMap,
- missed,
- deserializeBinary,
- skipVals,
- keepCacheObjects,
- skipStore,
- needVer,
- expiryPlc);
- }
-
- return new GridFinishedFuture<>(retMap);
- }
- }
- catch (IgniteCheckedException e) {
- setRollbackOnly();
-
- return new GridFinishedFuture<>(e);
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Map<? extends K, ? extends V> map,
- boolean retval
- ) {
- return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
- entryTopVer,
- map,
- null,
- null,
- null,
- retval);
- }
-
- /** {@inheritDoc} */
- @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- V val,
- boolean retval,
- CacheEntryPredicate filter) {
- return putAsync0(cacheCtx,
- entryTopVer,
- key,
- val,
- null,
- null,
- retval,
- filter);
- }
-
- /** {@inheritDoc} */
- @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- K key,
- EntryProcessor<K, V, Object> entryProcessor,
- Object... invokeArgs) {
- return (IgniteInternalFuture)putAsync0(cacheCtx,
- entryTopVer,
- key,
- null,
- entryProcessor,
- invokeArgs,
- true,
- null);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> putAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheDrInfo> drMap
- ) {
- Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
- @Override public Object apply(GridCacheDrInfo val) {
- return val.value();
- }
- });
-
- return this.<Object, Object>putAllAsync0(cacheCtx,
- null,
- map,
- null,
- null,
- drMap,
- false);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
- GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
- Object... invokeArgs
- ) {
- return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
- entryTopVer,
- null,
- map,
- invokeArgs,
- null,
- true);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> removeAllDrAsync(
- GridCacheContext cacheCtx,
- Map<KeyCacheObject, GridCacheVersion> drMap
- ) {
- return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
- }
-
- /**
- * Checks filter for non-pessimistic transactions.
- *
- * @param cctx Cache context.
- * @param key Key.
- * @param val Value.
- * @param filter Filter to check.
- * @return {@code True} if passed or pessimistic.
- */
- private boolean filter(
- GridCacheContext cctx,
- KeyCacheObject key,
- CacheObject val,
- CacheEntryPredicate[] filter) {
- return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter);
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param cacheKey Key to enlist.
- * @param val Value.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param entryProcessor Entry processor (for invoke operation).
- * @param invokeArgs Optional arguments for EntryProcessor.
- * @param retval Flag indicating whether a value should be returned.
- * @param lockOnly If {@code true}, then entry will be enlisted as noop.
- * @param filter User filters.
- * @param ret Return value.
- * @param skipStore Skip store flag.
- * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
- * @return Future for entry values loading.
- */
- private <K, V> IgniteInternalFuture<Void> enlistWrite(
- final GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- KeyCacheObject cacheKey,
- Object val,
- @Nullable ExpiryPolicy expiryPlc,
- @Nullable EntryProcessor<K, V, Object> entryProcessor,
- @Nullable Object[] invokeArgs,
- final boolean retval,
- boolean lockOnly,
- final CacheEntryPredicate[] filter,
- final GridCacheReturn ret,
- boolean skipStore,
- final boolean singleRmv,
- boolean keepBinary,
- Byte dataCenterId) {
- try {
- addActiveCache(cacheCtx);
-
- final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
- final boolean needVal = singleRmv || retval || hasFilters;
- final boolean needReadVer = needVal && (serializable() && optimistic());
-
- if (entryProcessor != null)
- transform = true;
-
- GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
-
- boolean loadMissed = enlistWriteEntry(cacheCtx,
- entryTopVer,
- cacheKey,
- val,
- entryProcessor,
- invokeArgs,
- expiryPlc,
- retval,
- lockOnly,
- filter,
- /*drVer*/drVer,
- /*drTtl*/-1L,
- /*drExpireTime*/-1L,
- ret,
- /*enlisted*/null,
- skipStore,
- singleRmv,
- hasFilters,
- needVal,
- needReadVer,
- keepBinary);
-
- if (loadMissed) {
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return loadMissing(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- Collections.singleton(cacheKey),
- filter,
- ret,
- needReadVer,
- singleRmv,
- hasFilters,
- /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
- retval,
- keepBinary,
- expiryPlc);
- }
-
- return new GridFinishedFuture<>();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- /**
- * Internal routine for <tt>putAll(..)</tt>
- *
- * @param cacheCtx Cache context.
- * @param keys Keys to enlist.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param lookup Value lookup map ({@code null} for remove).
- * @param invokeMap Map with entry processors for invoke operation.
- * @param invokeArgs Optional arguments for EntryProcessor.
- * @param retval Flag indicating whether a value should be returned.
- * @param lockOnly If {@code true}, then entry will be enlisted as noop.
- * @param filter User filters.
- * @param ret Return value.
- * @param enlisted Collection of keys enlisted into this transaction.
- * @param drPutMap DR put map (optional).
- * @param drRmvMap DR remove map (optional).
- * @param skipStore Skip store flag.
- * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
- * @param keepBinary Keep binary flag.
- * @param dataCenterId Optional data center ID.
- * @return Future for missing values loading.
- */
- private <K, V> IgniteInternalFuture<Void> enlistWrite(
- final GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- Collection<?> keys,
- @Nullable ExpiryPolicy expiryPlc,
- @Nullable Map<?, ?> lookup,
- @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
- @Nullable Object[] invokeArgs,
- final boolean retval,
- boolean lockOnly,
- final CacheEntryPredicate[] filter,
- final GridCacheReturn ret,
- Collection<KeyCacheObject> enlisted,
- @Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
- @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
- boolean skipStore,
- final boolean singleRmv,
- final boolean keepBinary,
- Byte dataCenterId
- ) {
- assert retval || invokeMap == null;
-
- try {
- addActiveCache(cacheCtx);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
-
- boolean rmv = lookup == null && invokeMap == null;
-
- final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
- final boolean needVal = singleRmv || retval || hasFilters;
- final boolean needReadVer = needVal && (serializable() && optimistic());
-
- try {
- // Set transform flag for transaction.
- if (invokeMap != null)
- transform = true;
-
- Set<KeyCacheObject> missedForLoad = null;
-
- for (Object key : keys) {
- if (key == null) {
- rollback();
-
- throw new NullPointerException("Null key.");
- }
-
- Object val = rmv || lookup == null ? null : lookup.get(key);
- EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key);
-
- GridCacheVersion drVer;
- long drTtl;
- long drExpireTime;
-
- if (drPutMap != null) {
- GridCacheDrInfo info = drPutMap.get(key);
-
- assert info != null;
-
- drVer = info.version();
- drTtl = info.ttl();
- drExpireTime = info.expireTime();
- }
- else if (drRmvMap != null) {
- assert drRmvMap.get(key) != null;
-
- drVer = drRmvMap.get(key);
- drTtl = -1L;
- drExpireTime = -1L;
- }
- else if (dataCenterId != null) {
- drVer = cctx.versions().next(dataCenterId);
- drTtl = -1L;
- drExpireTime = -1L;
- }
- else {
- drVer = null;
- drTtl = -1L;
- drExpireTime = -1L;
- }
-
- if (!rmv && val == null && entryProcessor == null) {
- setRollbackOnly();
-
- throw new NullPointerException("Null value.");
- }
-
- KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
-
- boolean loadMissed = enlistWriteEntry(cacheCtx,
- entryTopVer,
- cacheKey,
- val,
- entryProcessor,
- invokeArgs,
- expiryPlc,
- retval,
- lockOnly,
- filter,
- drVer,
- drTtl,
- drExpireTime,
- ret,
- enlisted,
- skipStore,
- singleRmv,
- hasFilters,
- needVal,
- needReadVer,
- keepBinary);
-
- if (loadMissed) {
- if (missedForLoad == null)
- missedForLoad = new HashSet<>();
-
- missedForLoad.add(cacheKey);
- }
- }
-
- if (missedForLoad != null) {
- AffinityTopologyVersion topVer = topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = entryTopVer;
-
- return loadMissing(cacheCtx,
- topVer != null ? topVer : topologyVersion(),
- missedForLoad,
- filter,
- ret,
- needReadVer,
- singleRmv,
- hasFilters,
- /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore,
- retval,
- keepBinary,
- expiryPlc);
- }
-
- return new GridFinishedFuture<>();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param keys Keys to load.
- * @param filter Filter.
- * @param ret Return value.
- * @param needReadVer Read version flag.
- * @param singleRmv {@code True} for single remove operation.
- * @param hasFilters {@code True} if filters not empty.
- * @param readThrough Read through flag.
- * @param retval Return value flag.
- * @param expiryPlc Expiry policy.
- * @return Load future.
- */
- private IgniteInternalFuture<Void> loadMissing(
- final GridCacheContext cacheCtx,
- final AffinityTopologyVersion topVer,
- final Set<KeyCacheObject> keys,
- final CacheEntryPredicate[] filter,
- final GridCacheReturn ret,
- final boolean needReadVer,
- final boolean singleRmv,
- final boolean hasFilters,
- final boolean readThrough,
- final boolean retval,
- final boolean keepBinary,
- final ExpiryPolicy expiryPlc) {
- GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
- new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
- @Override public void apply(KeyCacheObject key,
- @Nullable Object val,
- @Nullable GridCacheVersion loadVer) {
- if (log.isDebugEnabled())
- log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
-
- IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
-
- assert e != null;
-
- if (needReadVer) {
- assert loadVer != null;
-
- e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
- }
-
- if (singleRmv) {
- assert !hasFilters && !retval;
- assert val == null || Boolean.TRUE.equals(val) : val;
-
- ret.set(cacheCtx, null, val != null, keepBinary);
- }
- else {
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
-
- if (e.op() == TRANSFORM) {
- GridCacheVersion ver;
-
- e.readValue(cacheVal);
-
- try {
- ver = e.cached().version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : e;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(e, cacheVal, ret, ver);
- }
- else {
- boolean success;
-
- if (hasFilters) {
- success = isAll(e.context(), key, cacheVal, filter);
-
- if (!success)
- e.value(cacheVal, false, false);
- }
- else
- success = true;
-
- ret.set(cacheCtx, cacheVal, success, keepBinary);
- }
- }
- }
- };
-
- return loadMissing(
- cacheCtx,
- topVer,
- readThrough,
- /*async*/true,
- keys,
- /*skipVals*/singleRmv,
- needReadVer,
- keepBinary,
- expiryPlc,
- c);
- }
-
- /**
- * @param cacheCtx Cache context.
- * @param cacheKey Key.
- * @param val Value.
- * @param entryProcessor Entry processor.
- * @param invokeArgs Optional arguments for EntryProcessor.
- * @param expiryPlc Explicitly specified expiry policy for entry.
- * @param retval Return value flag.
- * @param lockOnly Lock only flag.
- * @param filter Filter.
- * @param drVer DR version.
- * @param drTtl DR ttl.
- * @param drExpireTime DR expire time.
- * @param ret Return value.
- * @param enlisted Enlisted keys collection.
- * @param skipStore Skip store flag.
- * @param singleRmv {@code True} for single remove operation.
- * @param hasFilters {@code True} if filters not empty.
- * @param needVal {@code True} if value is needed.
- * @param needReadVer {@code True} if need read entry version.
- * @return {@code True} if entry value should be loaded.
- * @throws IgniteCheckedException If failed.
- */
- private boolean enlistWriteEntry(GridCacheContext cacheCtx,
- @Nullable AffinityTopologyVersion entryTopVer,
- final KeyCacheObject cacheKey,
- @Nullable final Object val,
- @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
- @Nullable final Object[] invokeArgs,
- @Nullable final ExpiryPolicy expiryPlc,
- final boolean retval,
- final boolean lockOnly,
- final CacheEntryPredicate[] filter,
- final GridCacheVersion drVer,
- final long drTtl,
- long drExpireTime,
- final GridCacheReturn ret,
- @Nullable final Collection<KeyCacheObject> enlisted,
- boolean skipStore,
- boolean singleRmv,
- boolean hasFilters,
- final boolean needVal,
- boolean needReadVer,
- boolean keepBinary
- ) throws IgniteCheckedException {
- boolean loadMissed = false;
-
- final boolean rmv = val == null && entryProcessor == null;
-
- IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- // First time access.
- if (txEntry == null) {
- while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
-
- try {
- entry.unswap(false);
-
- // Check if lock is being explicitly acquired by the same thread.
- if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
- entry.lockedByThread(threadId, xidVer)) {
- throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
- "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
- ", entry=" + entry +
- ", xidVer=" + xidVer +
- ", threadId=" + threadId +
- ", locNodeId=" + cctx.localNodeId() + ']');
- }
-
- CacheObject old = null;
- GridCacheVersion readVer = null;
-
- if (optimistic() && !implicit()) {
- try {
- if (needReadVer) {
- EntryGetResult res = primaryLocal(entry) ?
- entry.innerGetVersioned(
- null,
- this,
- /*swap*/false,
- /*unmarshal*/retval || needVal,
- /*metrics*/retval,
- /*events*/retval,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null,
- keepBinary,
- null) : null;
-
- if (res != null) {
- old = res.value();
- readVer = res.version();
- }
- }
- else {
- old = entry.innerGet(
- null,
- this,
- /*swap*/false,
- /*read-through*/false,
- /*metrics*/retval,
- /*events*/retval,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null,
- keepBinary);
- }
- }
- catch (ClusterTopologyCheckedException e) {
- entry.context().evicts().touch(entry, topologyVersion());
-
- throw e;
- }
- }
- else
- old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
-
- final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
- entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
-
- if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
- ret.set(cacheCtx, old, false, keepBinary);
-
- if (!readCommitted()) {
- if (optimistic() && serializable()) {
- txEntry = addEntry(op,
- old,
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
- }
- else {
- txEntry = addEntry(READ,
- old,
- null,
- null,
- entry,
- null,
- CU.empty0(),
- false,
- -1L,
- -1L,
- null,
- skipStore,
- keepBinary);
- }
-
- txEntry.markValid();
-
- if (needReadVer) {
- assert readVer != null;
-
- txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
- }
-
- if (readCommitted())
- cacheCtx.evicts().touch(entry, topologyVersion());
-
- break; // While.
- }
-
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
-
- if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
- cacheCtx.evicts().touch(entry, topologyVersion());
-
- if (enlisted != null)
- enlisted.add(cacheKey);
-
- if (!pessimistic() && !implicit()) {
- txEntry.markValid();
-
- if (old == null) {
- if (needVal)
- loadMissed = true;
- else {
- assert !implicit() || !transform : this;
- assert txEntry.op() != TRANSFORM : txEntry;
-
- if (retval)
- ret.set(cacheCtx, null, true, keepBinary);
- else
- ret.success(true);
- }
- }
- else {
- if (needReadVer) {
- assert readVer != null;
-
- txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
-
- if (retval && !transform)
- ret.set(cacheCtx, old, true, keepBinary);
- else {
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
-
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : txEntry;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version " +
- "[err=" + ex.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(txEntry, old, ret, ver);
- }
- else
- ret.success(true);
- }
- }
- }
- // Pessimistic.
- else {
- if (retval && !transform)
- ret.set(cacheCtx, old, true, keepBinary);
- else
- ret.success(true);
- }
-
- break; // While.
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction putAll0 method: " + entry);
- }
- }
- }
- else {
- if (entryProcessor == null && txEntry.op() == TRANSFORM)
- throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
- "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
-
- GridCacheEntryEx entry = txEntry.cached();
-
- CacheObject v = txEntry.value();
-
- boolean del = txEntry.op() == DELETE && rmv;
-
- if (!del) {
- if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
- ret.set(cacheCtx, v, false, keepBinary);
-
- return loadMissed;
- }
-
- GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
- v != null ? UPDATE : CREATE;
-
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore,
- keepBinary);
-
- if (enlisted != null)
- enlisted.add(cacheKey);
-
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
-
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException e) {
- assert optimistic() : txEntry;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(txEntry, txEntry.value(), ret, ver);
- }
- }
-
- if (!pessimistic()) {
- txEntry.markValid();
-
- if (retval && !transform)
- ret.set(cacheCtx, v, true, keepBinary);
- else
- ret.success(true);
- }
- }
-
- return loadMissed;
- }
-
- /**
- * @param cctx Cache context.
- * @param key Key.
- * @param val Value.
- * @param filter Filter.
- * @return {@code True} if filter passed.
- */
- private boolean isAll(GridCacheContext cctx,
- KeyCacheObject key,
- CacheObject val,
- CacheEntryPredicate[] filter) {
- GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
- @Nullable @Override public CacheObject peekVisibleValue() {
- return rawGet();
- }
- };
-
- for (CacheEntryPredicate p0 : filter) {
- if (p0 != null && !p0.apply(e))
- return false;
- }
-
- return true;
- }
-
- /**
- * Post lock processing for put or remove.
- *
- * @param cacheCtx Context.
- * @param keys Keys.
- * @param ret Return value.
- * @param rmv {@code True} if remove.
- * @param retval Flag to return value or not.
- * @param read {@code True} if read.
- * @param accessTtl TTL for read operation.
- * @param filter Filter to check entries.
- * @throws IgniteCheckedException If error.
- * @param computeInvoke If {@code true} computes return value for invoke operation.
- */
- @SuppressWarnings("unchecked")
- protected final void postLockWrite(
- GridCacheContext cacheCtx,
- Iterable<KeyCacheObject> keys,
- GridCacheReturn ret,
- boolean rmv,
- boolean retval,
- boolean read,
- long accessTtl,
- CacheEntryPredicate[] filter,
- boolean computeInvoke
- ) throws IgniteCheckedException {
- for (KeyCacheObject k : keys) {
- IgniteTxEntry txEntry = entry(cacheCtx.txKey(k));
-
- if (txEntry == null)
- throw new IgniteCheckedException("Transaction entry is null (most likely collection of keys passed into cache " +
- "operation was changed before operation completed) [missingKey=" + k + ", tx=" + this + ']');
-
- while (true) {
- GridCacheEntryEx cached = txEntry.cached();
-
- try {
- assert cached.detached() || cached.lockedByThread(threadId) || isRollbackOnly() :
- "Transaction lock is not acquired [entry=" + cached + ", tx=" + this +
- ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']';
-
- if (log.isDebugEnabled())
- log.debug("Post lock write entry: " + cached);
-
- CacheObject v = txEntry.previousValue();
- boolean hasPrevVal = txEntry.hasPreviousValue();
-
- if (onePhaseCommit())
- filter = txEntry.filters();
-
- // If we have user-passed filter, we must read value into entry for peek().
- if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter))
- retval = true;
-
- boolean invoke = txEntry.op() == TRANSFORM;
-
- if (retval || invoke) {
- if (!cacheCtx.isNear()) {
- if (!hasPrevVal) {
- // For non-local cache should read from store after lock on primary.
- boolean readThrough = cacheCtx.isLocal() &&
- (invoke || cacheCtx.loadPreviousValue()) &&
- !txEntry.skipStore();
-
- v = cached.innerGet(
- null,
- this,
- /*swap*/true,
- readThrough,
- /*metrics*/!invoke,
- /*event*/!invoke && !dht(),
- /*temporary*/false,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- null,
- txEntry.keepBinary());
- }
- }
- else {
- if (!hasPrevVal)
- v = cached.rawGetOrUnmarshal(false);
- }
-
- if (txEntry.op() == TRANSFORM) {
- if (computeInvoke) {
- GridCacheVersion ver;
-
- try {
- ver = cached.version();
- }
- catch (GridCacheEntryRemovedException e) {
- assert optimistic() : txEntry;
-
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
-
- ver = null;
- }
-
- addInvokeResult(txEntry, v, ret, ver);
- }
- }
- else
- ret.value(cacheCtx, v, txEntry.keepBinary());
- }
-
- boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter);
-
- // For remove operation we return true only if we are removing s/t,
- // i.e. cached value is not null.
- ret.success(pass && (!retval ? !rmv || cached.hasValue() || v != null : !rmv || v != null));
-
- if (onePhaseCommit())
- txEntry.filtersPassed(pass);
-
- boolean updateTtl = read;
-
- if (pass) {
- txEntry.markValid();
-
- if (log.isDebugEnabled())
- log.debug("Filter passed in post lock for key: " + k);
- }
- else {
- // Revert operation to previous. (if no - NOOP, so entry will be unlocked).
- txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
- txEntry.filters(CU.empty0());
- txEntry.filtersSet(false);
-
- updateTtl = !cacheCtx.putIfAbsentFilter(filter);
- }
-
- if
<TRUNCATED>