You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ra...@apache.org on 2015/10/28 13:07:06 UTC
[25/31] ignite git commit: ignite-1607 Implemented deadlock-free
optimistic serializable tx mode
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index f22e753..82e5f2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -31,12 +31,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -57,7 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -85,10 +86,8 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -105,6 +104,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.RELOAD;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
@@ -424,46 +425,126 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Boolean> loadMissing(
+ @Override public IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
final boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
- boolean deserializePortable,
boolean skipVals,
- final IgniteBiInClosure<KeyCacheObject, Object> c
+ boolean needVer,
+ final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
) {
- if (!async) {
- try {
- if (!readThrough || !cacheCtx.readThrough()) {
- for (KeyCacheObject key : keys)
- c.apply(key, null);
+ assert cacheCtx.isLocal() : cacheCtx.name();
- return new GridFinishedFuture<>(false);
- }
+ if (!readThrough || !cacheCtx.readThrough()) {
+ for (KeyCacheObject key : keys)
+ c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
- return new GridFinishedFuture<>(
- cacheCtx.store().loadAll(this, keys, c));
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
+ return new GridFinishedFuture<>();
}
- else {
- return cctx.kernalContext().closure().callLocalSafe(
- new GPC<Boolean>() {
- @Override public Boolean call() throws Exception {
- if (!readThrough || !cacheCtx.readThrough()) {
- for (KeyCacheObject key : keys)
- c.apply(key, null);
-
- return false;
+
+ try {
+ IgniteCacheExpiryPolicy expiryPlc = accessPolicy(cacheCtx, keys);
+
+ Map<KeyCacheObject, GridCacheVersion> misses = null;
+
+ for (KeyCacheObject key : keys) {
+ while (true) {
+ IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
+
+ GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) :
+ txEntry.cached();
+
+ if (entry == null)
+ continue;
+
+ try {
+ T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this,
+ /*readSwap*/true,
+ /*unmarshal*/true,
+ /*update-metrics*/!skipVals,
+ /*event*/!skipVals,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ expiryPlc);
+
+ if (res == null) {
+ if (misses == null)
+ misses = new LinkedHashMap<>();
+
+ misses.put(key, entry.version());
}
+ else
+ c.apply(key, skipVals ? true : res.get1(), res.get2());
- return cacheCtx.store().loadAll(IgniteTxLocalAdapter.this, keys, c);
+ break;
}
- },
- true);
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry, will retry: " + key);
+
+ if (txEntry != null)
+ txEntry.cached(cacheCtx.cache().entryEx(key));
+ }
+ }
+ }
+
+ if (misses != null) {
+ final Map<KeyCacheObject, GridCacheVersion> misses0 = misses;
+
+ cacheCtx.store().loadAll(this, misses.keySet(), new CI2<KeyCacheObject, Object>() {
+ @Override public void apply(KeyCacheObject key, Object val) {
+ GridCacheVersion ver = misses0.remove(key);
+
+ assert ver != null : key;
+
+ if (val != null) {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+ while (true) {
+ GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+
+ try {
+ GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null);
+
+ boolean set = setVer != null;
+
+ if (set)
+ ver = setVer;
+
+ if (log.isDebugEnabled())
+ log.debug("Set value loaded from store into entry [set=" + set +
+ ", curVer=" + ver + ", newVer=" + setVer + ", " +
+ "entry=" + entry + ']');
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry, (will retry): " + entry);
+ }
+ catch (IgniteCheckedException e) {
+ // Wrap errors (will be unwrapped).
+ throw new GridClosureException(e);
+ }
+ }
+ }
+ else
+ ver = SER_READ_EMPTY_ENTRY_VER;
+
+ c.apply(key, val, ver);
+ }
+ });
+
+ for (KeyCacheObject key : misses0.keySet())
+ c.apply(key, null, SER_READ_EMPTY_ENTRY_VER);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
}
}
@@ -834,13 +915,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry,
true);
+ GridCacheVersion dhtVer = null;
+
// For near local transactions we must record DHT version
// in order to keep near entries on backup nodes until
// backup remote transaction completes.
if (cacheCtx.isNear()) {
if (txEntry.op() == CREATE || txEntry.op() == UPDATE ||
txEntry.op() == DELETE || txEntry.op() == TRANSFORM)
- ((GridNearCacheEntry)cached).recordDhtVersion(txEntry.dhtVersion());
+ dhtVer = txEntry.dhtVersion();
if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) &&
txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) {
@@ -921,6 +1004,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.conflictVersion(explicitVer);
}
+ if (dhtVer == null)
+ dhtVer = explicitVer != null ? explicitVer : writeVersion();
+
if (op == CREATE || op == UPDATE) {
GridCacheUpdateTxResult updRes = cached.innerSet(
this,
@@ -938,9 +1024,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.conflictExpireTime(),
cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
- resolveTaskName());
+ resolveTaskName(),
+ dhtVer);
- if (nearCached != null && updRes.success())
+ if (nearCached != null && updRes.success()) {
nearCached.innerSet(
null,
eventNodeId(),
@@ -957,7 +1044,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.conflictExpireTime(),
null,
CU.subjectId(this, cctx),
- resolveTaskName());
+ resolveTaskName(),
+ dhtVer);
+ }
}
else if (op == DELETE) {
GridCacheUpdateTxResult updRes = cached.innerRemove(
@@ -973,9 +1062,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cached.detached() ? DR_NONE : drType,
cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
- resolveTaskName());
+ resolveTaskName(),
+ dhtVer);
- if (nearCached != null && updRes.success())
+ if (nearCached != null && updRes.success()) {
nearCached.innerRemove(
null,
eventNodeId(),
@@ -989,7 +1079,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
DR_NONE,
null,
CU.subjectId(this, cctx),
- resolveTaskName());
+ resolveTaskName(),
+ dhtVer);
+ }
}
else if (op == RELOAD) {
cached.innerReload();
@@ -1180,14 +1272,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
commitErr.get());
}
+ if (near()) {
+ // Must evict near entries before rolling back from
+ // transaction manager, so they will be removed from cache.
+ for (IgniteTxEntry e : allEntries())
+ evictNearEntry(e, false);
+ }
+
if (doneFlag.compareAndSet(false, true)) {
try {
- if (near())
- // Must evict near entries before rolling back from
- // transaction manager, so they will be removed from cache.
- for (IgniteTxEntry e : allEntries())
- evictNearEntry(e, false);
-
cctx.tm().rollbackTx(this);
if (!internal()) {
@@ -1228,12 +1321,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param entry Entry.
+ * @return {@code True} if local node is current primary for given entry.
+ */
+ private boolean primaryLocal(GridCacheEntryEx entry) {
+ return entry.context().affinity().primary(cctx.localNode(), entry.partition(), AffinityTopologyVersion.NONE);
+ }
+
+ /**
* Checks if there is a cached or swapped value for
- * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method.
+ * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
*
* @param cacheCtx Cache context.
* @param keys Key to enlist.
- * @param cached Cached entry, if called from entry wrapper.
* @param expiryPlc Explicitly specified expiry policy for entry.
* @param map Return map.
* @param missed Map of missed keys.
@@ -1249,7 +1349,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
private <K, V> Collection<KeyCacheObject> enlistRead(
final GridCacheContext cacheCtx,
Collection<KeyCacheObject> keys,
- @Nullable GridCacheEntryEx cached,
@Nullable ExpiryPolicy expiryPlc,
Map<K, V> map,
Map<KeyCacheObject, GridCacheVersion> missed,
@@ -1261,7 +1360,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
) throws IgniteCheckedException {
assert !F.isEmpty(keys);
assert keysCnt == keys.size();
- assert cached == null || F.first(keys).equals(cached.key());
cacheCtx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -1271,11 +1369,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
AffinityTopologyVersion topVer = topologyVersion();
+ boolean needReadVer = serializable() && optimistic();
+
// In this loop we cover only read-committed or optimistic transactions.
// Transactions that are pessimistic and not read-committed are covered
// outside of this loop.
for (KeyCacheObject key : keys) {
- if (pessimistic() && !readCommitted() && !skipVals)
+ if ((pessimistic() || needReadVer) && !readCommitted() && !skipVals)
addActiveCache(cacheCtx);
IgniteTxKey txKey = cacheCtx.txKey(key);
@@ -1337,13 +1437,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
break;
}
- catch (GridCacheFilterFailedException e) {
- if (log.isDebugEnabled())
- log.debug("Filter validation failed for entry: " + txEntry);
-
- if (!readCommitted())
- txEntry.readValue(e.<V>value());
- }
catch (GridCacheEntryRemovedException ignored) {
txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
}
@@ -1359,38 +1452,49 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
lockKeys.add(key);
while (true) {
- GridCacheEntryEx entry;
-
- if (cached != null) {
- entry = cached;
-
- cached = null;
- }
- else
- entry = entryEx(cacheCtx, txKey, topVer);
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topVer);
try {
GridCacheVersion ver = entry.version();
CacheObject val = null;
+ GridCacheVersion readVer = null;
if (!pessimistic() || readCommitted() && !skipVals) {
IgniteCacheExpiryPolicy accessPlc =
optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
- // This call will check for filter.
- val = entry.innerGet(this,
- /*swap*/true,
- /*no read-through*/false,
- /*fail-fast*/true,
- /*unmarshal*/true,
- /*metrics*/true,
- /*event*/true,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- accessPlc);
+ if (needReadVer) {
+ T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+ entry.innerGetVersioned(this,
+ /*swap*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc) : null;
+
+ if (res != null) {
+ val = res.get1();
+ readVer = res.get2();
+ }
+ }
+ else {
+ val = entry.innerGet(this,
+ /*swap*/true,
+ /*no read-through*/false,
+ /*fail-fast*/true,
+ /*unmarshal*/true,
+ /*metrics*/true,
+ /*event*/true,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ accessPlc);
+ }
if (val != null) {
cacheCtx.addResult(map,
@@ -1424,8 +1528,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// As optimization, mark as checked immediately
// for non-pessimistic if value is not null.
- if (val != null && !pessimistic())
+ if (val != null && !pessimistic()) {
txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.serializableReadVersion(readVer);
+ }
+ }
}
break; // While.
@@ -1434,34 +1545,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (log.isDebugEnabled())
log.debug("Got removed entry in transaction getAllAsync(..) (will retry): " + key);
}
- catch (GridCacheFilterFailedException e) {
- if (log.isDebugEnabled())
- log.debug("Filter validation failed for entry: " + entry);
-
- if (!readCommitted()) {
- // Value for which failure occurred.
- CacheObject val = e.value();
-
- txEntry = addEntry(READ,
- val,
- null,
- null,
- entry,
- expiryPlc,
- CU.empty0(),
- false,
- -1L,
- -1L,
- null,
- skipStore);
-
- // Mark as checked immediately for non-pessimistic.
- if (val != null && !pessimistic())
- txEntry.markValid();
- }
-
- break; // While loop.
- }
finally {
if (cacheCtx.isNear() && entry != null && readCommitted()) {
if (cacheCtx.affinity().belongs(cacheCtx.localNode(), entry.partition(), topVer)) {
@@ -1492,6 +1575,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param cacheCtx Cache context.
+ * @param keys Keys.
+ * @return Expiry policy.
+ */
+ protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, Collection<KeyCacheObject> keys) {
+ return null;
+ }
+
+ /**
* Adds skipped key.
*
* @param skipped Skipped set (possibly {@code null}).
@@ -1512,12 +1604,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/**
* Loads all missed keys for
- * {@link #getAllAsync(GridCacheContext, Collection, GridCacheEntryEx, boolean, boolean, boolean, boolean)} method.
+ * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
*
* @param cacheCtx Cache context.
* @param map Return map.
* @param missedMap Missed keys.
- * @param redos Keys to retry.
* @param deserializePortable Deserialize portable flag.
* @param skipVals Skip values flag.
* @param keepCacheObjects Keep cache objects flag.
@@ -1528,55 +1619,25 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final GridCacheContext cacheCtx,
final Map<K, V> map,
final Map<KeyCacheObject, GridCacheVersion> missedMap,
- @Nullable final Collection<KeyCacheObject> redos,
final boolean deserializePortable,
final boolean skipVals,
final boolean keepCacheObjects,
final boolean skipStore
) {
- assert redos != null || pessimistic();
-
if (log.isDebugEnabled())
log.debug("Loading missed values for missed map: " + missedMap);
- final Collection<KeyCacheObject> loaded = new HashSet<>();
+ final boolean needReadVer = serializable() && optimistic();
return new GridEmbeddedFuture<>(
- new C2<Boolean, Exception, Map<K, V>>() {
- @Override public Map<K, V> apply(Boolean b, Exception e) {
+ new C2<Void, Exception, Map<K, V>>() {
+ @Override public Map<K, V> apply(Void v, Exception e) {
if (e != null) {
setRollbackOnly();
throw new GridClosureException(e);
}
- if (!b && !readCommitted()) {
- // There is no store - we must mark the entries.
- for (KeyCacheObject key : missedMap.keySet()) {
- IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
- if (txEntry != null)
- txEntry.markValid();
- }
- }
-
- if (readCommitted()) {
- Collection<KeyCacheObject> notFound = new HashSet<>(missedMap.keySet());
-
- notFound.removeAll(loaded);
-
- // In read-committed mode touch entries that have just been read.
- for (KeyCacheObject key : notFound) {
- IgniteTxEntry txEntry = entry(cacheCtx.txKey(key));
-
- GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().peekEx(key) :
- txEntry.cached();
-
- if (entry != null)
- cacheCtx.evicts().touch(entry, topologyVersion());
- }
- }
-
return map;
}
},
@@ -1585,13 +1646,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
!skipStore,
false,
missedMap.keySet(),
- deserializePortable,
skipVals,
- new CI2<KeyCacheObject, Object>() {
- /** */
- private GridCacheVersion nextVer;
-
- @Override public void apply(KeyCacheObject key, Object val) {
+ needReadVer,
+ new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+ @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) {
if (isRollbackOnly()) {
if (log.isDebugEnabled())
log.debug("Ignoring loaded value for read because transaction was rolled back: " +
@@ -1600,15 +1658,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return;
}
- GridCacheVersion ver = missedMap.get(key);
-
- if (ver == null) {
- if (log.isDebugEnabled())
- log.debug("Value from storage was never asked for [key=" + key + ", val=" + val + ']');
-
- return;
- }
-
CacheObject cacheVal = cacheCtx.toCacheObject(val);
CacheObject visibleVal = cacheVal;
@@ -1625,90 +1674,42 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
visibleVal = txEntry.applyEntryProcessors(visibleVal);
}
- // In pessimistic mode we hold the lock, so filter validation
- // should always be valid.
- if (pessimistic())
- ver = null;
-
- // Initialize next version.
- if (nextVer == null)
- nextVer = cctx.versions().next(topologyVersion());
-
- while (true) {
- assert txEntry != null || readCommitted() || skipVals;
-
- GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
-
- try {
- // Must initialize to true since even if filter didn't pass,
- // we still record the transaction value.
- boolean set;
-
- try {
- set = e.versionedValue(cacheVal, ver, nextVer);
- }
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction getAll method " +
- "(will try again): " + e);
-
- if (pessimistic() && !readCommitted() && !isRollbackOnly()) {
- U.error(log, "Inconsistent transaction state (entry got removed while " +
- "holding lock) [entry=" + e + ", tx=" + IgniteTxLocalAdapter.this + "]");
-
- setRollbackOnly();
-
- return;
- }
-
- if (txEntry != null)
- txEntry.cached(entryEx(cacheCtx, txKey));
-
- continue; // While loop.
- }
-
- // In pessimistic mode, we should always be able to set.
- assert set || !pessimistic();
-
- if (readCommitted() || skipVals) {
- cacheCtx.evicts().touch(e, topologyVersion());
+ assert txEntry != null || readCommitted() || skipVals;
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializePortable,
- false);
- }
- }
- else {
- assert txEntry != null;
+ GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
- txEntry.setAndMarkValid(cacheVal);
+ if (readCommitted() || skipVals) {
+ cacheCtx.evicts().touch(e, topologyVersion());
- if (visibleVal != null) {
- cacheCtx.addResult(map,
- key,
- visibleVal,
- skipVals,
- keepCacheObjects,
- deserializePortable,
- false);
- }
- }
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
+ }
+ }
+ else {
+ assert txEntry != null;
- loaded.add(key);
+ txEntry.setAndMarkValid(cacheVal);
- if (log.isDebugEnabled())
- log.debug("Set value loaded from store into entry from transaction [set=" + set +
- ", matchVer=" + ver + ", newVer=" + nextVer + ", entry=" + e + ']');
+ if (needReadVer) {
+ assert loadVer != null;
- break; // While loop.
+ txEntry.serializableReadVersion(loadVer);
}
- catch (IgniteCheckedException ex) {
- throw new IgniteException("Failed to put value for cache entry: " + e, ex);
+
+ if (visibleVal != null) {
+ cacheCtx.addResult(map,
+ key,
+ visibleVal,
+ skipVals,
+ keepCacheObjects,
+ deserializePortable,
+ false);
}
}
}
@@ -1720,7 +1721,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
@Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
final GridCacheContext cacheCtx,
Collection<KeyCacheObject> keys,
- @Nullable GridCacheEntryEx cached,
final boolean deserializePortable,
final boolean skipVals,
final boolean keepCacheObjects,
@@ -1747,7 +1747,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
keys,
- cached,
expiryPlc,
retMap,
missed,
@@ -1850,20 +1849,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.cached(entryEx(cacheCtx, txKey));
}
- catch (GridCacheFilterFailedException e) {
- // Failed value for the filter.
- CacheObject val = e.value();
-
- if (val != null) {
- // If filter fails after lock is acquired, we don't reload,
- // regardless if value is null or not.
- missed.remove(cacheKey);
-
- txEntry.setAndMarkValid(val);
- }
-
- break; // While.
- }
}
}
@@ -1871,7 +1856,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return checkMissed(cacheCtx,
retMap,
missed,
- null,
deserializePortable,
skipVals,
keepCacheObjects,
@@ -1920,8 +1904,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
else {
assert optimistic() || readCommitted() || skipVals;
- final Collection<KeyCacheObject> redos = new ArrayList<>();
-
if (!missed.isEmpty()) {
if (!readCommitted())
for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) {
@@ -1937,67 +1919,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (missed.isEmpty())
return new GridFinishedFuture<>(retMap);
- IgniteInternalFuture<Map<K, V>> fut0 = checkMissed(cacheCtx,
+ return checkMissed(cacheCtx,
retMap,
missed,
- redos,
deserializePortable,
skipVals,
keepCacheObjects,
skipStore);
-
- return new GridEmbeddedFuture<>(
- // First future.
- fut0,
- // Closure that returns another future, based on result from first.
- new PMC<Map<K, V>>() {
- @Override public IgniteInternalFuture<Map<K, V>> postMiss(Map<K, V> map) {
- if (redos.isEmpty())
- return new GridFinishedFuture<>(
- Collections.<K, V>emptyMap());
-
- if (log.isDebugEnabled())
- log.debug("Starting to future-recursively get values for keys: " + redos);
-
- // Future recursion.
- return getAllAsync(cacheCtx,
- redos,
- null,
- deserializePortable,
- skipVals,
- true,
- skipStore);
- }
- },
- // Finalize.
- new FinishClosure<Map<K, V>>() {
- @Override Map<K, V> finish(Map<K, V> loaded) {
- for (Map.Entry<K, V> entry : loaded.entrySet()) {
- KeyCacheObject cacheKey = (KeyCacheObject)entry.getKey();
-
- IgniteTxEntry txEntry = entry(cacheCtx.txKey(cacheKey));
-
- CacheObject val = (CacheObject)entry.getValue();
-
- if (!readCommitted())
- txEntry.readValue(val);
-
- if (!F.isEmpty(txEntry.entryProcessors()))
- val = txEntry.applyEntryProcessors(val);
-
- cacheCtx.addResult(retMap,
- cacheKey,
- val,
- skipVals,
- keepCacheObjects,
- deserializePortable,
- false);
- }
-
- return retMap;
- }
- }
- );
}
return new GridFinishedFuture<>(retMap);
@@ -2016,8 +1944,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
GridCacheContext cacheCtx,
Map<? extends K, ? extends V> map,
boolean retval,
- @Nullable GridCacheEntryEx cached,
- long ttl,
CacheEntryPredicate[] filter
) {
return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
@@ -2026,7 +1952,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
null,
retval,
- cached,
filter);
}
@@ -2041,7 +1966,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
drMap,
false,
- null,
null);
}
@@ -2058,7 +1982,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
invokeArgs,
null,
true,
- null,
null);
}
@@ -2067,20 +1990,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
GridCacheContext cacheCtx,
Map<KeyCacheObject, GridCacheVersion> drMap
) {
- return removeAllAsync0(cacheCtx, null, drMap, null, false, null);
+ return removeAllAsync0(cacheCtx, null, drMap, false, null, false);
}
/**
* Checks filter for non-pessimistic transactions.
*
- * @param cached Cached entry.
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
* @param filter Filter to check.
* @return {@code True} if passed or pessimistic.
- * @throws IgniteCheckedException If failed.
*/
- private <K, V> boolean filter(GridCacheEntryEx cached,
- CacheEntryPredicate[] filter) throws IgniteCheckedException {
- return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter);
+ private boolean filter(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ CacheEntryPredicate[] filter) {
+ return pessimistic() || (optimistic() && implicit()) || isAll(cctx, key, val, filter);
}
/**
@@ -2088,7 +2015,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
*
* @param cacheCtx Cache context.
* @param keys Keys to enlist.
- * @param cached Cached entry.
* @param expiryPlc Explicitly specified expiry policy for entry.
* @param implicit Implicit flag.
* @param lookup Value lookup map ({@code null} for remove).
@@ -2102,28 +2028,28 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param drPutMap DR put map (optional).
* @param drRmvMap DR remove map (optional).
* @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
* @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
*/
- protected <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
+ private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
final GridCacheContext cacheCtx,
Collection<?> keys,
- @Nullable GridCacheEntryEx cached,
@Nullable ExpiryPolicy expiryPlc,
boolean implicit,
@Nullable Map<?, ?> lookup,
@Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
@Nullable Object[] invokeArgs,
- boolean retval,
+ final boolean retval,
boolean lockOnly,
- CacheEntryPredicate[] filter,
+ final CacheEntryPredicate[] filter,
final GridCacheReturn ret,
Collection<KeyCacheObject> enlisted,
@Nullable Map<KeyCacheObject, GridCacheDrInfo> drPutMap,
@Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
- boolean skipStore
+ boolean skipStore,
+ final boolean singleRmv
) {
- assert cached == null || keys.size() == 1;
- assert cached == null || F.first(keys).equals(cached.key());
+ assert retval || invokeMap == null;
try {
addActiveCache(cacheCtx);
@@ -2138,6 +2064,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
Set<KeyCacheObject> missedForLoad = null;
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
try {
// Set transform flag for transaction.
if (invokeMap != null)
@@ -2194,19 +2124,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
// First time access.
if (txEntry == null) {
while (true) {
- GridCacheEntryEx entry = null;
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
try {
- if (cached != null) {
- entry = cached;
-
- cached = null;
- }
- else {
- entry = entryEx(cacheCtx, txKey, topologyVersion());
-
- entry.unswap(false);
- }
+ entry.unswap(false);
// Check if lock is being explicitly acquired by the same thread.
if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
@@ -2217,45 +2138,57 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
", locNodeId=" + cctx.localNodeId() + ']');
CacheObject old = null;
-
- boolean readThrough = !skipStore && !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ GridCacheVersion readVer = null;
if (optimistic() && !implicit()) {
try {
- // Should read through if filter is specified.
- old = entry.innerGet(this,
- /*swap*/false,
- /*read-through*/readThrough && cacheCtx.loadPreviousValue(),
- /*fail-fast*/false,
- /*unmarshal*/retval,
- /*metrics*/retval,
- /*events*/retval,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null);
+ if (needReadVer) {
+ T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+ entry.innerGetVersioned(this,
+ /*swap*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null) : null;
+
+ if (res != null) {
+ old = res.get1();
+ readVer = res.get2();
+ }
+ }
+ else {
+ old = entry.innerGet(this,
+ /*swap*/false,
+ /*read-through*/false,
+ /*fail-fast*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null);
+ }
}
catch (ClusterTopologyCheckedException e) {
entry.context().evicts().touch(entry, topologyVersion());
throw e;
}
- catch (GridCacheFilterFailedException e) {
- e.printStackTrace();
-
- assert false : "Empty filter failed: " + e;
- }
}
else
old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
- if (!filter(entry, filter)) {
+ if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
skipped = skip(skipped, cacheKey);
ret.set(cacheCtx, old, false);
- if (!readCommitted() && old != null) {
+ if (!readCommitted()) {
// Enlist failed filters as reads for non-read-committed mode,
// so future ops will get the same values.
txEntry = addEntry(READ,
@@ -2272,9 +2205,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
skipStore);
txEntry.markValid();
+
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
}
- if (readCommitted() || old == null)
+ if (readCommitted())
cacheCtx.evicts().touch(entry, topologyVersion());
break; // While.
@@ -2305,9 +2244,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
txEntry.markValid();
if (old == null) {
- boolean load = retval && !readThrough;
-
- if (load) {
+ if (needVal) {
if (missedForLoad == null)
missedForLoad = new HashSet<>();
@@ -2324,6 +2261,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
else {
+ if (needReadVer) {
+ assert readVer != null;
+
+ txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
+
if (retval && !transform)
ret.set(cacheCtx, old, true);
else {
@@ -2369,7 +2312,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
else {
if (entryProcessor == null && txEntry.op() == TRANSFORM)
throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
- "transaction after transform closure is applied): " + key);
+ "transaction after EntryProcessor is applied): " + key);
GridCacheEntryEx entry = txEntry.cached();
@@ -2378,7 +2321,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
boolean del = txEntry.op() == DELETE && rmv;
if (!del) {
- if (!filter(entry, filter)) {
+ if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
skipped = skip(skipped, cacheKey);
ret.set(cacheCtx, v, false);
@@ -2439,15 +2382,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
if (missedForLoad != null) {
- IgniteInternalFuture<Boolean> fut = loadMissing(
+ final boolean skipVals = singleRmv;
+
+ IgniteInternalFuture<Void> fut = loadMissing(
cacheCtx,
/*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
/*async*/true,
missedForLoad,
- deserializePortables(cacheCtx),
- /*skip values*/false,
- new CI2<KeyCacheObject, Object>() {
- @Override public void apply(KeyCacheObject key, Object val) {
+ skipVals,
+ needReadVer,
+ new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+ @Override public void apply(KeyCacheObject key,
+ @Nullable Object val,
+ @Nullable GridCacheVersion loadVer) {
if (log.isDebugEnabled())
log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
@@ -2455,33 +2402,50 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
assert e != null;
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
+ if (needReadVer) {
+ assert loadVer != null;
- if (e.op() == TRANSFORM) {
- GridCacheVersion ver;
+ e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ }
- try {
- ver = e.cached().version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : e;
+ if (singleRmv) {
+ assert !hasFilters && !retval;
+ assert val == null || Boolean.TRUE.equals(val) : val;
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+ ret.set(cacheCtx, null, val != null);
+ }
+ else {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
- ver = null;
+ if (e.op() == TRANSFORM) {
+ GridCacheVersion ver;
+
+ try {
+ ver = e.cached().version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : e;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+
+ addInvokeResult(e, cacheVal, ret, ver);
}
+ else {
+ boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter);
- addInvokeResult(e, cacheVal, ret, ver);
+ ret.set(cacheCtx, cacheVal, success);
+ }
}
- else
- ret.set(cacheCtx, cacheVal, true);
}
});
return new GridEmbeddedFuture<>(
- new C2<Boolean, Exception, Set<KeyCacheObject>>() {
- @Override public Set<KeyCacheObject> apply(Boolean b, Exception e) {
+ new C2<Void, Exception, Set<KeyCacheObject>>() {
+ @Override public Set<KeyCacheObject> apply(Void b, Exception e) {
if (e != null)
throw new GridClosureException(e);
@@ -2495,6 +2459,31 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param cctx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param filter Filter.
+ * @return {@code True} if filter passed.
+ */
+ private boolean isAll(GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ CacheEntryPredicate[] filter) {
+ GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) {
+ @Nullable @Override public CacheObject peekVisibleValue() {
+ return rawGet();
+ }
+ };
+
+ for (CacheEntryPredicate p0 : filter) {
+ if (p0 != null && !p0.apply(e))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* Post lock processing for put or remove.
*
* @param cacheCtx Context.
@@ -2555,29 +2544,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (retval || invoke) {
if (!cacheCtx.isNear()) {
- try {
- if (!hasPrevVal) {
- boolean readThrough =
- (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
-
- v = cached.innerGet(this,
- /*swap*/true,
- readThrough,
- /*failFast*/false,
- /*unmarshal*/true,
- /*metrics*/!invoke,
- /*event*/!invoke && !dht(),
- /*temporary*/false,
- CU.subjectId(this, cctx),
- null,
- resolveTaskName(),
- null);
- }
- }
- catch (GridCacheFilterFailedException e) {
- e.printStackTrace();
-
- assert false : "Empty filter failed: " + e;
+ if (!hasPrevVal) {
+ boolean readThrough =
+ (invoke || cacheCtx.loadPreviousValue()) && !txEntry.skipStore();
+
+ v = cached.innerGet(this,
+ /*swap*/true,
+ readThrough,
+ /*failFast*/false,
+ /*unmarshal*/true,
+ /*metrics*/!invoke,
+ /*event*/!invoke && !dht(),
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ null,
+ resolveTaskName(),
+ null);
}
}
else {
@@ -2725,7 +2707,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param invokeArgs Optional arguments for EntryProcessor.
* @param drMap DR map.
* @param retval Key-transform value map to store.
- * @param cached Cached entry, if any.
* @param filter Filter.
* @return Operation future.
*/
@@ -2737,7 +2718,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
@Nullable final Object[] invokeArgs,
@Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap,
final boolean retval,
- @Nullable GridCacheEntryEx cached,
@Nullable final CacheEntryPredicate[] filter
) {
assert filter == null || invokeMap == null;
@@ -2778,8 +2758,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
assert map0 != null || invokeMap0 != null;
- assert cached == null ||
- (map0 != null && map0.size() == 1) || (invokeMap0 != null && invokeMap0.size() == 1);
try {
checkValid();
@@ -2814,7 +2792,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
cacheCtx,
keySet,
- cached,
opCtx != null ? opCtx.expiry() : null,
implicit,
map0,
@@ -2827,7 +2804,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
enlisted,
drMap,
null,
- opCtx != null && opCtx.skipStore());
+ opCtx != null && opCtx.skipStore(),
+ false);
if (pessimistic()) {
// Loose all skipped.
@@ -2921,8 +2899,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
else
return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
- @Override
- public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException {
f.get();
return ret;
@@ -2951,11 +2928,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
@Override public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
GridCacheContext cacheCtx,
Collection<? extends K> keys,
- @Nullable GridCacheEntryEx cached,
boolean retval,
- CacheEntryPredicate[] filter
+ CacheEntryPredicate[] filter,
+ boolean singleRmv
) {
- return removeAllAsync0(cacheCtx, keys, null, cached, retval, filter);
+ return removeAllAsync0(cacheCtx, keys, null, retval, filter, singleRmv);
}
/**
@@ -2963,8 +2940,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param keys Keys to remove.
* @param drMap DR map.
* @param retval Flag indicating whether a value should be returned.
- * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1.
* @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
* @return Future for asynchronous remove.
*/
@SuppressWarnings("unchecked")
@@ -2972,9 +2949,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final GridCacheContext cacheCtx,
@Nullable final Collection<? extends K> keys,
@Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
- @Nullable GridCacheEntryEx cached,
final boolean retval,
- @Nullable final CacheEntryPredicate[] filter) {
+ @Nullable final CacheEntryPredicate[] filter,
+ boolean singleRmv) {
try {
checkUpdatesAllowed(cacheCtx);
}
@@ -2998,7 +2975,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
keys0 = keys;
assert keys0 != null;
- assert cached == null || keys0.size() == 1;
if (log.isDebugEnabled())
log.debug("Called removeAllAsync(...) [tx=" + this + ", keys=" + keys0 + ", implicit=" + implicit +
@@ -3043,7 +3019,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
cacheCtx,
keys0,
- /** cached entry */null,
plc,
implicit,
/** lookup map */null,
@@ -3056,7 +3031,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
enlisted,
null,
drMap,
- opCtx != null && opCtx.skipStore()
+ opCtx != null && opCtx.skipStore(),
+ singleRmv
);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 6f72290..0d83338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -19,17 +19,17 @@ package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
import java.util.Map;
+import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.jetbrains.annotations.Nullable;
/**
@@ -64,8 +64,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
/**
* @param cacheCtx Cache context.
* @param keys Keys to get.
- * @param cached Cached entry if this method is called from entry wrapper
- * Cached entry is passed if and only if there is only one key in collection of keys.
* @param deserializePortable Deserialize portable flag.
* @param skipVals Skip values flag.
* @param keepCacheObjects Keep cache objects
@@ -75,7 +73,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
GridCacheContext cacheCtx,
Collection<KeyCacheObject> keys,
- @Nullable GridCacheEntryEx cached,
boolean deserializePortable,
boolean skipVals,
boolean keepCacheObjects,
@@ -85,17 +82,13 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
* @param cacheCtx Cache context.
* @param map Map to put.
* @param retval Flag indicating whether a value should be returned.
- * @param cached Cached entry, if any. Will be provided only if map has size 1.
* @param filter Filter.
- * @param ttl Time to live for entry. If negative, leave unchanged.
* @return Future for put operation.
*/
public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
GridCacheContext cacheCtx,
Map<? extends K, ? extends V> map,
boolean retval,
- @Nullable GridCacheEntryEx cached,
- long ttl,
CacheEntryPredicate[] filter);
/**
@@ -113,16 +106,16 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
* @param cacheCtx Cache context.
* @param keys Keys to remove.
* @param retval Flag indicating whether a value should be returned.
- * @param cached Cached entry, if any. Will be provided only if size of keys collection is 1.
* @param filter Filter.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
* @return Future for asynchronous remove.
*/
public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
GridCacheContext cacheCtx,
Collection<? extends K> keys,
- @Nullable GridCacheEntryEx cached,
boolean retval,
- CacheEntryPredicate[] filter);
+ CacheEntryPredicate[] filter,
+ boolean singleRmv);
/**
* @param cacheCtx Cache context.
@@ -161,17 +154,17 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
* @param readThrough Read through flag.
* @param async if {@code True}, then loading will happen in a separate thread.
* @param keys Keys.
- * @param c Closure.
- * @param deserializePortable Deserialize portable flag.
* @param skipVals Skip values flag.
+ * @param needVer If {@code true} version is required for loaded values.
+ * @param c Closure to be applied for loaded values.
* @return Future with {@code True} value if loading took place.
*/
- public IgniteInternalFuture<Boolean> loadMissing(
+ public IgniteInternalFuture<Void> loadMissing(
GridCacheContext cacheCtx,
boolean readThrough,
boolean async,
Collection<KeyCacheObject> keys,
- boolean deserializePortable,
boolean skipVals,
- IgniteBiInClosure<KeyCacheObject, Object> c);
+ boolean needVer,
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);
}
\ No newline at end of file