You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/24 09:19:39 UTC
[04/50] [abbrv] ignite git commit: 'Single' operations optimizations
for tx cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2c7bf8a..758f82c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -66,9 +66,7 @@ import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -123,18 +121,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** */
private static final long serialVersionUID = 0L;
- /** Per-transaction read map. */
- @GridToStringInclude
- protected Map<IgniteTxKey, IgniteTxEntry> txMap;
-
- /** Read view on transaction map. */
- @GridToStringExclude
- protected IgniteTxMap readView;
-
- /** Write view on transaction map. */
- @GridToStringExclude
- protected IgniteTxMap writeView;
-
/** Minimal version encountered (either explicit lock or XID of this transaction). */
protected GridCacheVersion minVer;
@@ -156,9 +142,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** Commit error. */
protected AtomicReference<Throwable> commitErr = new AtomicReference<>();
- /** Active cache IDs. */
- protected Set<Integer> activeCacheIds = new HashSet<>();
-
/** Need return value. */
protected boolean needRetVal;
@@ -168,6 +151,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** Flag indicating whether deployment is enabled for caches from this transaction or not. */
private boolean depEnabled;
+ /** */
+ @GridToStringInclude
+ protected IgniteTxLocalState txState;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -209,7 +196,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cctx,
xidVer,
implicit,
- implicitSingle,
/*local*/true,
sys,
plc,
@@ -225,6 +211,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
);
minVer = xidVer;
+
+ txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxState txState() {
+ return txState;
}
/**
@@ -246,7 +239,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public boolean empty() {
- return txMap.isEmpty();
+ return txState.empty();
}
/** {@inheritDoc} */
@@ -267,16 +260,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
assert false;
- return false;
- }
- /**
- * Gets collection of active cache IDs for this transaction.
- *
- * @return Collection of active cache IDs.
- */
- @Override public Collection<Integer> activeCacheIds() {
- return activeCacheIds;
+ return false;
}
/** {@inheritDoc} */
@@ -284,77 +269,70 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return depEnabled;
}
+ /**
+ * @param depEnabled Flag indicating whether deployment is enabled for caches from this transaction or not.
+ */
+ public void activeCachesDeploymentEnabled(boolean depEnabled) {
+ this.depEnabled = depEnabled;
+ }
+
/** {@inheritDoc} */
@Override public boolean isStarted() {
- return txMap != null;
+ return txState.initialized();
}
/** {@inheritDoc} */
@Override public boolean hasWriteKey(IgniteTxKey key) {
- return writeView.containsKey(key);
+ return txState.hasWriteKey(key);
}
/**
* @return Transaction read set.
*/
@Override public Set<IgniteTxKey> readSet() {
- return txMap == null ? Collections.<IgniteTxKey>emptySet() : readView.keySet();
+ return txState.readSet();
}
/**
* @return Transaction write set.
*/
@Override public Set<IgniteTxKey> writeSet() {
- return txMap == null ? Collections.<IgniteTxKey>emptySet() : writeView.keySet();
- }
-
- /** {@inheritDoc} */
- @Override public boolean removed(IgniteTxKey key) {
- if (txMap == null)
- return false;
-
- IgniteTxEntry e = txMap.get(key);
-
- return e != null && e.op() == DELETE;
+ return txState.writeSet();
}
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
- return readView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : readView;
+ return txState.readMap();
}
/** {@inheritDoc} */
@Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
- return writeView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : writeView;
+ return txState.writeMap();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> allEntries() {
- return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values();
+ return txState.allEntries();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> readEntries() {
- return readView == null ? Collections.<IgniteTxEntry>emptyList() : readView.values();
+ return txState.readEntries();
}
/** {@inheritDoc} */
@Override public Collection<IgniteTxEntry> writeEntries() {
- return writeView == null ? Collections.<IgniteTxEntry>emptyList() : writeView.values();
+ return txState.writeEntries();
}
/** {@inheritDoc} */
@Nullable @Override public IgniteTxEntry entry(IgniteTxKey key) {
- return txMap == null ? null : txMap.get(key);
+ return txState.entry(key);
}
/** {@inheritDoc} */
@Override public void seal() {
- if (readView != null)
- readView.seal();
-
- if (writeView != null)
- writeView.seal();
+ txState.seal();
}
/** {@inheritDoc} */
@@ -409,7 +387,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
KeyCacheObject key,
CacheEntryPredicate[] filter
) throws GridCacheFilterFailedException {
- IgniteTxEntry e = txMap == null ? null : txMap.get(cacheCtx.txKey(key));
+ IgniteTxEntry e = entry(cacheCtx.txKey(key));
if (e != null) {
// We should look at tx entry previous value. If this is a user peek then previous
@@ -652,7 +630,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!storeEnabled() || internal())
return;
- Collection<CacheStoreManager> stores = stores();
+ Collection<CacheStoreManager> stores = txState.stores(cctx);
if (stores == null || stores.isEmpty())
return;
@@ -854,7 +832,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
checkValid();
- boolean empty = F.isEmpty(near() ? txMap : writeMap());
+ Collection<IgniteTxEntry> commitEntries = near() ? allEntries() : writeEntries();
+
+ boolean empty = F.isEmpty(commitEntries);
// Register this transaction as completed prior to write-phase to
// ensure proper lock ordering for removed entries.
@@ -874,7 +854,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/*
* Commit to cache. Note that for 'near' transaction we loop through all the entries.
*/
- for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
+ for (IgniteTxEntry txEntry : commitEntries) {
GridCacheContext cacheCtx = txEntry.context();
GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE;
@@ -1282,7 +1262,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cctx.tm().rollbackTx(this);
if (!internal()) {
- Collection<CacheStoreManager> stores = stores();
+ Collection<CacheStoreManager> stores = txState.stores(cctx);
if (stores != null && !stores.isEmpty()) {
assert isWriteToStoreFromDhtValid(stores) :
@@ -1582,25 +1562,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
- * Adds skipped key.
- *
- * @param skipped Skipped set (possibly {@code null}).
- * @param key Key to add.
- * @return Skipped set.
- */
- private Set<KeyCacheObject> skip(Set<KeyCacheObject> skipped, KeyCacheObject key) {
- if (skipped == null)
- skipped = new GridLeanSet<>();
-
- skipped.add(key);
-
- if (log.isDebugEnabled())
- log.debug("Added key to skipped set: " + key);
-
- return skipped;
- }
-
- /**
* Loads all missed keys for
* {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method.
*
@@ -1954,6 +1915,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
+ @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+ GridCacheContext cacheCtx,
+ K key,
+ V val,
+ boolean retval,
+ CacheEntryPredicate[] filter) {
+ return putAsync0(cacheCtx, key, val, null, null, retval, filter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+ K key,
+ EntryProcessor<K, V, Object> entryProcessor,
+ Object... invokeArgs) {
+ return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null);
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllDrAsync(
GridCacheContext cacheCtx,
Map<KeyCacheObject, GridCacheDrInfo> drMap
@@ -2009,12 +1988,88 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key to enlist.
+ * @param val Value.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param entryProcessor Entry processor (for invoke operation).
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param lockOnly If {@code true}, then entry will be enlisted as noop.
+ * @param filter User filters.
+ * @param ret Return value.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @return Future for entry values loading.
+ */
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
+ final GridCacheContext cacheCtx,
+ KeyCacheObject cacheKey,
+ Object val,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable EntryProcessor<K, V, Object> entryProcessor,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ boolean skipStore,
+ final boolean singleRmv) {
+ try {
+ addActiveCache(cacheCtx);
+
+ final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
+ final boolean needVal = singleRmv || retval || hasFilters;
+ final boolean needReadVer = needVal && (serializable() && optimistic());
+
+ if (entryProcessor != null)
+ transform = true;
+
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ /*drVer*/null,
+ /*drTtl*/-1L,
+ /*drExpireTime*/-1L,
+ ret,
+ /*enlisted*/null,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer);
+
+ if (loadMissed) {
+ return loadMissing(cacheCtx,
+ Collections.singleton(cacheKey),
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ skipStore,
+ retval);
+ }
+
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ /**
* Internal routine for <tt>putAll(..)</tt>
*
* @param cacheCtx Cache context.
* @param keys Keys to enlist.
* @param expiryPlc Explicitly specified expiry policy for entry.
- * @param implicit Implicit flag.
* @param lookup Value lookup map ({@code null} for remove).
* @param invokeMap Map with entry processors for invoke operation.
* @param invokeArgs Optional arguments for EntryProcessor.
@@ -2027,13 +2082,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @param drRmvMap DR remove map (optional).
* @param skipStore Skip store flag.
* @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
- * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions).
+ * @return Future for missing values loading.
*/
- private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite(
+ private <K, V> IgniteInternalFuture<Void> enlistWrite(
final GridCacheContext cacheCtx,
Collection<?> keys,
@Nullable ExpiryPolicy expiryPlc,
- boolean implicit,
@Nullable Map<?, ?> lookup,
@Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap,
@Nullable Object[] invokeArgs,
@@ -2056,8 +2110,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
return new GridFinishedFuture<>(e);
}
- Set<KeyCacheObject> skipped = null;
-
boolean rmv = lookup == null && invokeMap == null;
Set<KeyCacheObject> missedForLoad = null;
@@ -2115,345 +2167,441 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
- IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
-
- IgniteTxEntry txEntry = entry(txKey);
-
- // First time access.
- if (txEntry == null) {
- while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
-
- try {
- entry.unswap(false);
-
- // Check if lock is being explicitly acquired by the same thread.
- if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
- entry.lockedByThread(threadId, xidVer))
- throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
- "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer +
- ", threadId=" + threadId +
- ", locNodeId=" + cctx.localNodeId() + ']');
-
- CacheObject old = null;
- GridCacheVersion readVer = null;
+ boolean loadMissed = enlistWriteEntry(cacheCtx,
+ cacheKey,
+ val,
+ entryProcessor,
+ invokeArgs,
+ expiryPlc,
+ retval,
+ lockOnly,
+ filter,
+ drVer,
+ drTtl,
+ drExpireTime,
+ ret,
+ enlisted,
+ skipStore,
+ singleRmv,
+ hasFilters,
+ needVal,
+ needReadVer);
+
+ if (loadMissed) {
+ if (missedForLoad == null)
+ missedForLoad = new HashSet<>();
+
+ missedForLoad.add(cacheKey);
+ }
+ }
- if (optimistic() && !implicit()) {
- try {
- if (needReadVer) {
- T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
- entry.innerGetVersioned(this,
- /*swap*/false,
- /*unmarshal*/retval,
- /*metrics*/retval,
- /*events*/retval,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null) : null;
+ if (missedForLoad != null) {
+ return loadMissing(cacheCtx,
+ missedForLoad,
+ filter,
+ ret,
+ needReadVer,
+ singleRmv,
+ hasFilters,
+ skipStore,
+ retval);
+ }
- if (res != null) {
- old = res.get1();
- readVer = res.get2();
- }
- }
- else {
- old = entry.innerGet(this,
- /*swap*/false,
- /*read-through*/false,
- /*fail-fast*/false,
- /*unmarshal*/retval,
- /*metrics*/retval,
- /*events*/retval,
- /*temporary*/false,
- CU.subjectId(this, cctx),
- entryProcessor,
- resolveTaskName(),
- null);
- }
- }
- catch (ClusterTopologyCheckedException e) {
- entry.context().evicts().touch(entry, topologyVersion());
+ return new GridFinishedFuture<>();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
- throw e;
- }
- }
- else
- old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
+ /**
+ * @param cacheCtx Cache context.
+ * @param keys Keys to load.
+ * @param ret Return value.
+ * @param needReadVer Read version flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param skipStore Skip store flag.
+ * @param retval Return value flag.
+ * @return Load future.
+ */
+ private IgniteInternalFuture<Void> loadMissing(
+ final GridCacheContext cacheCtx,
+ final Set<KeyCacheObject> keys,
+ final CacheEntryPredicate[] filter,
+ final GridCacheReturn ret,
+ final boolean needReadVer,
+ final boolean singleRmv,
+ final boolean hasFilters,
+ final boolean skipStore,
+ final boolean retval) {
+ GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
+ new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
+ @Override public void apply(KeyCacheObject key,
+ @Nullable Object val,
+ @Nullable GridCacheVersion loadVer) {
+ if (log.isDebugEnabled())
+ log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
- if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
- skipped = skip(skipped, cacheKey);
+ IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
- ret.set(cacheCtx, old, false);
+ assert e != null;
- if (!readCommitted()) {
- // Enlist failed filters as reads for non-read-committed mode,
- // so future ops will get the same values.
- txEntry = addEntry(READ,
- old,
- null,
- null,
- entry,
- null,
- CU.empty0(),
- false,
- -1L,
- -1L,
- null,
- skipStore);
+ if (needReadVer) {
+ assert loadVer != null;
- txEntry.markValid();
+ e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ }
- if (needReadVer) {
- assert readVer != null;
+ if (singleRmv) {
+ assert !hasFilters && !retval;
+ assert val == null || Boolean.TRUE.equals(val) : val;
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
- }
+ ret.set(cacheCtx, null, val != null);
+ }
+ else {
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
- if (readCommitted())
- cacheCtx.evicts().touch(entry, topologyVersion());
+ if (e.op() == TRANSFORM) {
+ GridCacheVersion ver;
- break; // While.
+ try {
+ ver = e.cached().version();
}
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : e;
- final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
- entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore);
+ ver = null;
+ }
- if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
- cacheCtx.evicts().touch(entry, topologyVersion());
+ addInvokeResult(e, cacheVal, ret, ver);
+ }
+ else {
+ boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter);
- enlisted.add(cacheKey);
+ ret.set(cacheCtx, cacheVal, success);
+ }
+ }
+ }
+ };
- if (!pessimistic() && !implicit()) {
- txEntry.markValid();
+ return loadMissing(
+ cacheCtx,
+ /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
+ /*async*/true,
+ keys,
+ /*skipVals*/singleRmv,
+ needReadVer,
+ c);
+ }
- if (old == null) {
- if (needVal) {
- if (missedForLoad == null)
- missedForLoad = new HashSet<>();
+ /**
+ * @param cacheCtx Cache context.
+ * @param cacheKey Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param expiryPlc Explicitly specified expiry policy for entry.
+ * @param retval Return value flag.
+ * @param lockOnly
+ * @param filter Filter.
+ * @param drVer DR version.
+ * @param drTtl DR ttl.
+ * @param drExpireTime DR expire time.
+ * @param ret Return value.
+ * @param enlisted Enlisted keys collection.
+ * @param skipStore Skip store flag.
+ * @param singleRmv {@code True} for single remove operation.
+ * @param hasFilters {@code True} if filters not empty.
+ * @param needVal {@code True} if value is needed.
+ * @param needReadVer {@code True} if need read entry version.
+ * @return {@code True} if entry value should be loaded.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+ final KeyCacheObject cacheKey,
+ final @Nullable Object val,
+ final @Nullable EntryProcessor<?, ?, ?> entryProcessor,
+ final @Nullable Object[] invokeArgs,
+ final @Nullable ExpiryPolicy expiryPlc,
+ final boolean retval,
+ final boolean lockOnly,
+ final CacheEntryPredicate[] filter,
+ final GridCacheVersion drVer,
+ final long drTtl,
+ long drExpireTime,
+ final GridCacheReturn ret,
+ @Nullable final Collection<KeyCacheObject> enlisted,
+ boolean skipStore,
+ boolean singleRmv,
+ boolean hasFilters,
+ final boolean needVal,
+ boolean needReadVer
+ ) throws IgniteCheckedException {
+ boolean loadMissed = false;
- missedForLoad.add(cacheKey);
- }
- else {
- assert !implicit() || !transform : this;
- assert txEntry.op() != TRANSFORM : txEntry;
+ final boolean rmv = val == null && entryProcessor == null;
- if (retval)
- ret.set(cacheCtx, null, true);
- else
- ret.success(true);
- }
- }
- else {
- if (needReadVer) {
- assert readVer != null;
+ IgniteTxKey txKey = cacheCtx.txKey(cacheKey);
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
- }
+ IgniteTxEntry txEntry = entry(txKey);
- if (retval && !transform)
- ret.set(cacheCtx, old, true);
- else {
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
+ // First time access.
+ if (txEntry == null) {
+ while (true) {
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : txEntry;
+ try {
+ entry.unswap(false);
+
+ // Check if lock is being explicitly acquired by the same thread.
+ if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
+ entry.lockedByThread(threadId, xidVer)) {
+ throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
+ "externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
+ ", entry=" + entry +
+ ", xidVer=" + xidVer +
+ ", threadId=" + threadId +
+ ", locNodeId=" + cctx.localNodeId() + ']');
+ }
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version " +
- "[err=" + ex.getMessage() + ']');
+ CacheObject old = null;
+ GridCacheVersion readVer = null;
- ver = null;
- }
+ if (optimistic() && !implicit()) {
+ try {
+ if (needReadVer) {
+ T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+ entry.innerGetVersioned(this,
+ /*swap*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null) : null;
- addInvokeResult(txEntry, old, ret, ver);
- }
- else
- ret.success(true);
- }
+ if (res != null) {
+ old = res.get1();
+ readVer = res.get2();
}
}
- // Pessimistic.
else {
- if (retval && !transform)
- ret.set(cacheCtx, old, true);
- else
- ret.success(true);
+ old = entry.innerGet(this,
+ /*swap*/false,
+ /*read-through*/false,
+ /*fail-fast*/false,
+ /*unmarshal*/retval,
+ /*metrics*/retval,
+ /*events*/retval,
+ /*temporary*/false,
+ CU.subjectId(this, cctx),
+ entryProcessor,
+ resolveTaskName(),
+ null);
}
-
- break; // While.
}
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in transaction putAll0 method: " + entry);
+ catch (ClusterTopologyCheckedException e) {
+ entry.context().evicts().touch(entry, topologyVersion());
+
+ throw e;
}
}
- }
- else {
- if (entryProcessor == null && txEntry.op() == TRANSFORM)
- throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
- "transaction after EntryProcessor is applied): " + key);
+ else
+ old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
- GridCacheEntryEx entry = txEntry.cached();
+ if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
+ ret.set(cacheCtx, old, false);
- CacheObject v = txEntry.value();
-
- boolean del = txEntry.op() == DELETE && rmv;
+ if (!readCommitted()) {
+ // Enlist failed filters as reads for non-read-committed mode,
+ // so future ops will get the same values.
+ txEntry = addEntry(READ,
+ old,
+ null,
+ null,
+ entry,
+ null,
+ CU.empty0(),
+ false,
+ -1L,
+ -1L,
+ null,
+ skipStore);
- if (!del) {
- if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
- skipped = skip(skipped, cacheKey);
+ txEntry.markValid();
- ret.set(cacheCtx, v, false);
+ if (needReadVer) {
+ assert readVer != null;
- continue;
+ txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ }
}
- GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
- v != null ? UPDATE : CREATE;
+ if (readCommitted())
+ cacheCtx.evicts().touch(entry, topologyVersion());
- txEntry = addEntry(op,
- cacheCtx.toCacheObject(val),
- entryProcessor,
- invokeArgs,
- entry,
- expiryPlc,
- filter,
- true,
- drTtl,
- drExpireTime,
- drVer,
- skipStore);
+ break; // While.
+ }
+
+ final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+ entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore);
+ if (!implicit() && readCommitted() && !cacheCtx.offheapTiered())
+ cacheCtx.evicts().touch(entry, topologyVersion());
+
+ if (enlisted != null)
enlisted.add(cacheKey);
- if (txEntry.op() == TRANSFORM) {
- GridCacheVersion ver;
+ if (!pessimistic() && !implicit()) {
+ txEntry.markValid();
- try {
- ver = entry.version();
- }
- catch (GridCacheEntryRemovedException e) {
- assert optimistic() : txEntry;
+ if (old == null) {
+ if (needVal)
+ loadMissed = true;
+ else {
+ assert !implicit() || !transform : this;
+ assert txEntry.op() != TRANSFORM : txEntry;
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
+ if (retval)
+ ret.set(cacheCtx, null, true);
+ else
+ ret.success(true);
+ }
+ }
+ else {
+ if (needReadVer) {
+ assert readVer != null;
- ver = null;
+ txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
}
- addInvokeResult(txEntry, txEntry.value(), ret, ver);
- }
- }
+ if (retval && !transform)
+ ret.set(cacheCtx, old, true);
+ else {
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
- if (!pessimistic()) {
- txEntry.markValid();
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException ex) {
+ assert optimistic() : txEntry;
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version " +
+ "[err=" + ex.getMessage() + ']');
+
+ ver = null;
+ }
+ addInvokeResult(txEntry, old, ret, ver);
+ }
+ else
+ ret.success(true);
+ }
+ }
+ }
+ // Pessimistic.
+ else {
if (retval && !transform)
- ret.set(cacheCtx, v, true);
+ ret.set(cacheCtx, old, true);
else
ret.success(true);
}
- }
+
+ break; // While.
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ if (log.isDebugEnabled())
+ log.debug("Got removed entry in transaction putAll0 method: " + entry);
+ }
}
}
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
-
- if (missedForLoad != null) {
- final boolean skipVals = singleRmv;
-
- IgniteInternalFuture<Void> fut = loadMissing(
- cacheCtx,
- /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
- /*async*/true,
- missedForLoad,
- skipVals,
- needReadVer,
- new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
- @Override public void apply(KeyCacheObject key,
- @Nullable Object val,
- @Nullable GridCacheVersion loadVer) {
- if (log.isDebugEnabled())
- log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
+ else {
+ if (entryProcessor == null && txEntry.op() == TRANSFORM)
+ throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " +
+ "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false));
- IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId()));
+ GridCacheEntryEx entry = txEntry.cached();
- assert e != null;
+ CacheObject v = txEntry.value();
- if (needReadVer) {
- assert loadVer != null;
+ boolean del = txEntry.op() == DELETE && rmv;
- e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
- }
+ if (!del) {
+ if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) {
+ ret.set(cacheCtx, v, false);
- if (singleRmv) {
- assert !hasFilters && !retval;
- assert val == null || Boolean.TRUE.equals(val) : val;
+ return loadMissed;
+ }
- ret.set(cacheCtx, null, val != null);
- }
- else {
- CacheObject cacheVal = cacheCtx.toCacheObject(val);
+ GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM :
+ v != null ? UPDATE : CREATE;
- if (e.op() == TRANSFORM) {
- GridCacheVersion ver;
+ txEntry = addEntry(op,
+ cacheCtx.toCacheObject(val),
+ entryProcessor,
+ invokeArgs,
+ entry,
+ expiryPlc,
+ filter,
+ true,
+ drTtl,
+ drExpireTime,
+ drVer,
+ skipStore);
- try {
- ver = e.cached().version();
- }
- catch (GridCacheEntryRemovedException ex) {
- assert optimistic() : e;
+ if (enlisted != null)
+ enlisted.add(cacheKey);
- if (log.isDebugEnabled())
- log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']');
+ if (txEntry.op() == TRANSFORM) {
+ GridCacheVersion ver;
- ver = null;
- }
+ try {
+ ver = entry.version();
+ }
+ catch (GridCacheEntryRemovedException e) {
+ assert optimistic() : txEntry;
- addInvokeResult(e, cacheVal, ret, ver);
- }
- else {
- boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter);
+ if (log.isDebugEnabled())
+ log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']');
- ret.set(cacheCtx, cacheVal, success);
- }
- }
+ ver = null;
}
- });
- return new GridEmbeddedFuture<>(
- new C2<Void, Exception, Set<KeyCacheObject>>() {
- @Override public Set<KeyCacheObject> apply(Void b, Exception e) {
- if (e != null)
- throw new GridClosureException(e);
+ addInvokeResult(txEntry, txEntry.value(), ret, ver);
+ }
+ }
- return Collections.emptySet();
- }
- }, fut
- );
+ if (!pessimistic()) {
+ txEntry.markValid();
+
+ if (retval && !transform)
+ ret.set(cacheCtx, v, true);
+ else
+ ret.success(true);
+ }
}
- return new GridFinishedFuture<>(skipped);
+ return loadMissed;
}
/**
@@ -2486,22 +2634,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
*
* @param cacheCtx Context.
* @param keys Keys.
- * @param failed Collection of potentially failed keys (need to populate in this method).
* @param ret Return value.
* @param rmv {@code True} if remove.
* @param retval Flag to return value or not.
* @param read {@code True} if read.
* @param accessTtl TTL for read operation.
* @param filter Filter to check entries.
- * @return Failed keys.
* @throws IgniteCheckedException If error.
* @param computeInvoke If {@code true} computes return value for invoke operation.
*/
@SuppressWarnings("unchecked")
- protected Set<KeyCacheObject> postLockWrite(
+ protected final void postLockWrite(
GridCacheContext cacheCtx,
Iterable<KeyCacheObject> keys,
- Set<KeyCacheObject> failed,
GridCacheReturn ret,
boolean rmv,
boolean retval,
@@ -2606,8 +2751,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
log.debug("Filter passed in post lock for key: " + k);
}
else {
- failed = skip(failed, k);
-
// Revert operation to previous. (if no - NOOP, so entry will be unlocked).
txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value()));
txEntry.filters(CU.empty0());
@@ -2638,11 +2781,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
}
-
- if (log.isDebugEnabled())
- log.debug("Entries that failed after lock filter check: " + failed);
-
- return failed;
}
/**
@@ -2696,6 +2834,144 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @param cacheCtx Cache context.
+ * @param retval Return value flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void beforePut(GridCacheContext cacheCtx, boolean retval) throws IgniteCheckedException {
+ checkUpdatesAllowed(cacheCtx);
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT);
+
+ if (retval)
+ needReturnValue(true);
+
+ checkValid();
+
+ init();
+ }
+
+ /**
+ * Internal method for single update operation.
+ *
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Operation future.
+ */
+ private <K, V> IgniteInternalFuture putAsync0(
+ final GridCacheContext cacheCtx,
+ K key,
+ @Nullable V val,
+ @Nullable EntryProcessor entryProcessor,
+ @Nullable final Object[] invokeArgs,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate[] filter
+ ) {
+ assert key != null;
+
+ try {
+ beforePut(cacheCtx, retval);
+
+ final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
+
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ cacheKey,
+ val,
+ opCtx != null ? opCtx.expiry() : null,
+ entryProcessor,
+ invokeArgs,
+ retval,
+ /*lockOnly*/false,
+ filter,
+ ret,
+ opCtx != null && opCtx.skipStore(),
+ /*singleRmv*/false);
+
+ if (pessimistic()) {
+ assert loadFut == null || loadFut.isDone() : loadFut;
+
+ final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey);
+
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for put on key: " + enlisted);
+
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ lockTimeout(),
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override public GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
+
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
+ /*remove*/false,
+ retval,
+ /*read*/false,
+ -1L,
+ filter,
+ /*computeInvoke*/true);
+
+ return ret;
+ }
+ };
+
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
+ try {
+ return nonInterruptable(plc1.apply(false, e));
+ }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
+ }
+ }
+ }
+ else {
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ }
+ else
+ return optimisticPutFuture(loadFut, ret);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
* Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
* maps must be non-null.
*
@@ -2721,17 +2997,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
assert filter == null || invokeMap == null;
try {
- checkUpdatesAllowed(cacheCtx);
+ beforePut(cacheCtx, retval);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture(e);
}
- cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT);
-
- if (retval)
- needReturnValue(true);
-
// Cached entry may be passed only from entry wrapper.
final Map<?, ?> map0;
final Map<?, EntryProcessor<K, V, Object>> invokeMap0;
@@ -2757,15 +3028,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
assert map0 != null || invokeMap0 != null;
- try {
- checkValid();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
-
- init();
-
final GridCacheReturn ret = new GridCacheReturn(localResult(), false);
if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
@@ -2783,15 +3045,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
try {
Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
- Collection<KeyCacheObject> enlisted = new ArrayList<>();
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>();
CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
- final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
cacheCtx,
keySet,
opCtx != null ? opCtx.expiry() : null,
- implicit,
map0,
invokeMap0,
invokeArgs,
@@ -2806,15 +3067,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
false);
if (pessimistic()) {
- // Loose all skipped.
- final Set<KeyCacheObject> loaded = loadFut.get();
-
- final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded));
+ assert loadFut == null || loadFut.isDone() : loadFut;
if (log.isDebugEnabled())
- log.debug("Before acquiring transaction lock for put on keys: " + keys);
+ log.debug("Before acquiring transaction lock for put on keys: " + enlisted);
- IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys,
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
lockTimeout(),
this,
false,
@@ -2828,11 +3086,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
throws IgniteCheckedException
{
if (log.isDebugEnabled())
- log.debug("Acquired transaction lock for put on keys: " + keys);
+ log.debug("Acquired transaction lock for put on keys: " + enlisted);
postLockWrite(cacheCtx,
- keys,
- loaded,
+ enlisted,
ret,
/*remove*/false,
retval,
@@ -2861,64 +3118,79 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
}
}
- else
+ else {
return nonInterruptable(new GridEmbeddedFuture<>(
fut,
plc1
));
+ }
}
- else {
- if (implicit()) {
- // Should never load missing values for implicit transaction as values will be returned
- // with prepare response, if required.
- assert loadFut.isDone();
+ else
+ return optimisticPutFuture(loadFut, ret);
+ }
+ catch (RuntimeException e) {
+ onException();
- try {
- loadFut.get();
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(e);
- }
+ throw e;
+ }
+ }
- return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
- @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException {
- try {
- txFut.get();
+ /**
+ * @param loadFut Missing keys load future.
+ * @param ret Future result.
+ * @return Future.
+ */
+ private IgniteInternalFuture optimisticPutFuture(IgniteInternalFuture<Void> loadFut, final GridCacheReturn ret) {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
- return implicitRes;
- }
- catch (IgniteCheckedException | RuntimeException e) {
- rollbackAsync();
+ try {
+ loadFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
- throw e;
- }
+ return nonInterruptable(commitAsync().chain(
+ new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
+
+ return implicitRes;
}
- }));
- }
- else
- return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
- @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException {
- f.get();
+ catch (IgniteCheckedException | RuntimeException e) {
+ rollbackAsync();
- return ret;
+ throw e;
}
- }));
- }
+ }
+ }
+ ));
}
- catch (RuntimeException e) {
- for (IgniteTxEntry txEntry : txMap.values()) {
- GridCacheEntryEx cached0 = txEntry.cached();
-
- if (cached0 != null)
- txEntry.context().evicts().touch(cached0, topologyVersion());
- }
+ else {
+ return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) throws IgniteCheckedException {
+ f.get();
- throw e;
+ return ret;
+ }
+ }));
}
- catch (IgniteCheckedException e) {
- setRollbackOnly();
+ }
- return new GridFinishedFuture<>(e);
+ /**
+ *
+ */
+ private void onException() {
+ for (IgniteTxEntry txEntry : allEntries()) {
+ GridCacheEntryEx cached0 = txEntry.cached();
+
+ if (cached0 != null)
+ txEntry.context().evicts().touch(cached0, topologyVersion());
}
}
@@ -2974,9 +3246,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
assert keys0 != null;
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Called removeAllAsync(...) [tx=" + this + ", keys=" + keys0 + ", implicit=" + implicit +
", retval=" + retval + "]");
+ }
try {
checkValid();
@@ -3002,140 +3275,131 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
init();
- try {
- Collection<KeyCacheObject> enlisted = new ArrayList<>();
-
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>();
- ExpiryPolicy plc;
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
- if (!F.isEmpty(filter))
- plc = opCtx != null ? opCtx.expiry() : null;
- else
- plc = null;
+ ExpiryPolicy plc;
- final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite(
- cacheCtx,
- keys0,
- plc,
- implicit,
- /** lookup map */null,
- /** invoke map */null,
- /** invoke arguments */null,
- retval,
- /** lock only */false,
- filter,
- ret,
- enlisted,
- null,
- drMap,
- opCtx != null && opCtx.skipStore(),
- singleRmv
- );
+ if (!F.isEmpty(filter))
+ plc = opCtx != null ? opCtx.expiry() : null;
+ else
+ plc = null;
- if (log.isDebugEnabled())
- log.debug("Remove keys: " + enlisted);
+ final IgniteInternalFuture<Void> loadFut = enlistWrite(
+ cacheCtx,
+ keys0,
+ plc,
+ /** lookup map */null,
+ /** invoke map */null,
+ /** invoke arguments */null,
+ retval,
+ /** lock only */false,
+ filter,
+ ret,
+ enlisted,
+ null,
+ drMap,
+ opCtx != null && opCtx.skipStore(),
+ singleRmv
+ );
- // Acquire locks only after having added operation to the write set.
- // Otherwise, during rollback we will not know whether locks need
- // to be rolled back.
- if (pessimistic()) {
- // Loose all skipped.
- final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get()));
+ if (log.isDebugEnabled())
+ log.debug("Remove keys: " + enlisted);
- if (log.isDebugEnabled())
- log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys);
+ // Acquire locks only after having added operation to the write set.
+ // Otherwise, during rollback we will not know whether locks need
+ // to be rolled back.
+ if (pessimistic()) {
+ assert loadFut.isDone() : loadFut;
- IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys,
- lockTimeout(),
- this,
- false,
- retval,
- isolation,
- isInvalidate(),
- -1L);
+ if (log.isDebugEnabled())
+ log.debug("Before acquiring transaction lock for remove on keys: " + enlisted);
- PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
- @Override protected GridCacheReturn postLock(GridCacheReturn ret)
- throws IgniteCheckedException
- {
- if (log.isDebugEnabled())
- log.debug("Acquired transaction lock for remove on keys: " + passedKeys);
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
+ lockTimeout(),
+ this,
+ false,
+ retval,
+ isolation,
+ isInvalidate(),
+ -1L);
+
+ PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ @Override protected GridCacheReturn postLock(GridCacheReturn ret)
+ throws IgniteCheckedException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Acquired transaction lock for remove on keys: " + enlisted);
- postLockWrite(cacheCtx,
- passedKeys,
- loadFut.get(),
- ret,
+ postLockWrite(cacheCtx,
+ enlisted,
+ ret,
/*remove*/true,
- retval,
+ retval,
/*read*/false,
- -1L,
- filter,
+ -1L,
+ filter,
/*computeInvoke*/false);
- return ret;
- }
- };
+ return ret;
+ }
+ };
- if (fut.isDone()) {
+ if (fut.isDone()) {
+ try {
+ return nonInterruptable(plc1.apply(fut.get(), null));
+ }
+ catch (GridClosureException e) {
+ return new GridFinishedFuture<>(e.unwrap());
+ }
+ catch (IgniteCheckedException e) {
try {
- return nonInterruptable(plc1.apply(fut.get(), null));
+ return nonInterruptable(plc1.apply(false, e));
}
- catch (GridClosureException e) {
- return new GridFinishedFuture<>(e.unwrap());
- }
- catch (IgniteCheckedException e) {
- try {
- return nonInterruptable(plc1.apply(false, e));
- }
- catch (Exception e1) {
- return new GridFinishedFuture<>(e1);
- }
+ catch (Exception e1) {
+ return new GridFinishedFuture<>(e1);
}
}
- else
- return nonInterruptable(new GridEmbeddedFuture<>(
- fut,
- plc1
- ));
}
- else {
- if (implicit()) {
- // Should never load missing values for implicit transaction as values will be returned
- // with prepare response, if required.
- assert loadFut.isDone();
-
- return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
- @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
- throws IgniteCheckedException {
- try {
- txFut.get();
+ else
+ return nonInterruptable(new GridEmbeddedFuture<>(
+ fut,
+ plc1
+ ));
+ }
+ else {
+ if (implicit()) {
+ // Should never load missing values for implicit transaction as values will be returned
+ // with prepare response, if required.
+ assert loadFut.isDone();
- return implicitRes;
- }
- catch (IgniteCheckedException | RuntimeException e) {
- rollbackAsync();
+ return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut)
+ throws IgniteCheckedException {
+ try {
+ txFut.get();
- throw e;
- }
+ return implicitRes;
}
- }));
- }
- else
- return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() {
- @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f)
- throws IgniteCheckedException {
- f.get();
+ catch (IgniteCheckedException | RuntimeException e) {
+ rollbackAsync();
- return ret;
+ throw e;
}
- }));
+ }
+ }));
}
- }
- catch (IgniteCheckedException e) {
- setRollbackOnly();
+ else {
+ return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() {
+ @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f)
+ throws IgniteCheckedException {
+ f.get();
- return new GridFinishedFuture<>(e);
+ return ret;
+ }
+ }));
+ }
}
}
@@ -3169,16 +3433,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @return {@code True} if transaction was successfully started.
*/
public boolean init() {
- if (txMap == null) {
- txMap = new LinkedHashMap<>(txSize > 0 ? txSize : 16, 1.0f);
+ return !txState.init(txSize) || cctx.tm().onStarted(this);
- readView = new IgniteTxMap(txMap, CU.reads());
- writeView = new IgniteTxMap(txMap, CU.writes());
-
- return cctx.tm().onStarted(this);
- }
-
- return true;
}
/**
@@ -3188,38 +3444,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
* @throws IgniteCheckedException If caches already enlisted in this transaction are not compatible with given
* cache (e.g. they have different stores).
*/
- protected void addActiveCache(GridCacheContext cacheCtx) throws IgniteCheckedException {
- int cacheId = cacheCtx.cacheId();
-
- // Check if we can enlist new cache to transaction.
- if (!activeCacheIds.contains(cacheId)) {
- String err = cctx.verifyTxCompatibility(this, activeCacheIds, cacheCtx);
-
- if (err != null) {
- StringBuilder cacheNames = new StringBuilder();
-
- int idx = 0;
-
- for (Integer activeCacheId : activeCacheIds) {
- cacheNames.append(cctx.cacheContext(activeCacheId).name());
-
- if (idx++ < activeCacheIds.size() - 1)
- cacheNames.append(", ");
- }
-
- throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" +
- err +
- ") [activeCaches=[" + cacheNames + "]" +
- ", cacheName=" + cacheCtx.name() +
- ", cacheSystem=" + cacheCtx.systemTx() +
- ", txSystem=" + system() + ']');
- }
- else
- activeCacheIds.add(cacheId);
-
- if (activeCacheIds.size() == 1)
- depEnabled = cacheCtx.deploymentEnabled();
- }
+ protected final void addActiveCache(GridCacheContext cacheCtx) throws IgniteCheckedException {
+ txState.addActiveCache(cacheCtx, this);
}
/**
@@ -3294,7 +3520,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
"Invalid tx state for adding entry [op=" + op + ", val=" + val + ", entry=" + entry + ", filter=" +
Arrays.toString(filter) + ", txCtx=" + cctx.tm().txContextVersion() + ", tx=" + this + ']';
- IgniteTxEntry old = txMap.get(key);
+ IgniteTxEntry old = entry(key);
// Keep old filter if already have one (empty filter is always overridden).
if (!filtersSet || !F.isEmptyOrNulls(filter)) {
@@ -3358,7 +3584,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
if (!hasDrTtl)
txEntry.expiry(expiryPlc);
- txMap.put(key, txEntry);
+ txState.addEntry(txEntry);
if (log.isDebugEnabled())
log.debug("Created transaction entry: " + txEntry);
@@ -3420,7 +3646,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, "super", super.toString(),
- "size", (txMap == null ? 0 : txMap.size()));
+ "size", allEntries().size());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 0d83338..5dc3338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -93,9 +93,37 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
/**
* @param cacheCtx Cache context.
+ * @param key Key.
+ * @param val Value.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @return Future for put operation.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
+ GridCacheContext cacheCtx,
+ K key,
+ V val,
+ boolean retval,
+ CacheEntryPredicate[] filter);
+
+ /**
+ * @param cacheCtx Cache context.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @return Operation future.
+ */
+ public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+ GridCacheContext cacheCtx,
+ K key,
+ EntryProcessor<K, V, Object> entryProcessor,
+ Object... invokeArgs);
+
+ /**
+ * @param cacheCtx Cache context.
* @param map Entry processors map.
* @param invokeArgs Optional arguments for entry processor.
- * @return Transform operation future.
+ * @return Operation future.
*/
public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
GridCacheContext cacheCtx,
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
new file mode 100644
index 0000000..123d396
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+/**
+ *
+ */
+public interface IgniteTxLocalState extends IgniteTxState {
+ /**
+ * @param entry Entry.
+ */
+ public void addEntry(IgniteTxEntry entry);
+
+ /**
+ * @param txSize Transaction size.
+ * @return {@code True} if transaction was successfully started.
+ */
+ public boolean init(int txSize);
+
+ /**
+ * @return {@code True} if init method was called.
+ */
+ public boolean initialized();
+
+ /**
+ *
+ */
+ public void seal();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
new file mode 100644
index 0000000..cde5203
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState {
+ /**
+ * @param cacheCtx Cache context.
+ * @param tx Transaction.
+ * @param commit {@code False} if transaction rolled back.
+ */
+ protected final void onTxEnd(GridCacheContext cacheCtx, IgniteInternalTx tx, boolean commit) {
+ if (cacheCtx.cache().configuration().isStatisticsEnabled()) {
+ // Convert start time from ms to ns.
+ if (commit)
+ cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) * 1000);
+ else
+ cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - tx.startTime()) * 1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c2e7dea..ccccca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1092,13 +1092,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!tx.system())
cctx.txMetrics().onTxCommit();
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
-
- if (cacheCtx.cache().configuration().isStatisticsEnabled())
- // Convert start time from ms to ns.
- cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) * 1000);
- }
+ tx.txState().onTxEnd(cctx, tx, true);
}
if (slowTxWarnTimeout > 0 && tx.local() &&
@@ -1163,13 +1157,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!tx.system())
cctx.txMetrics().onTxRollback();
- for (int cacheId : tx.activeCacheIds()) {
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
-
- if (cacheCtx.cache().configuration().isStatisticsEnabled())
- // Convert start time from ms to ns.
- cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - tx.startTime()) * 1000);
- }
+ tx.txState().onTxEnd(cctx, tx, false);
}
if (log.isDebugEnabled())
@@ -1233,7 +1221,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!tx.system())
threadMap.remove(tx.threadId(), tx);
else {
- Integer cacheId = F.first(tx.activeCacheIds());
+ Integer cacheId = tx.txState().firstCacheId();
if (cacheId != null)
sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
index 6408573..429c995 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
@@ -170,8 +170,7 @@ public class IgniteTxMap extends AbstractMap<IgniteTxKey, IgniteTxEntry> impleme
}
/** {@inheritDoc} */
- @Nullable
- @Override public IgniteTxEntry get(Object key) {
+ @Nullable @Override public IgniteTxEntry get(Object key) {
IgniteTxEntry e = txMap.get(key);
return e == null ? null : filter.apply(e) ? e : null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index 9660e4e..b80909f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -25,22 +25,13 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
*/
public interface IgniteTxRemoteEx extends IgniteInternalTx {
/**
- * @return Remote thread ID.
- */
- public long remoteThreadId();
-
- /**
* @param baseVer Base version.
* @param committedVers Committed version.
* @param rolledbackVers Rolled back version.
* @param pendingVers Pending versions.
*/
- public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers,
- Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers);
-
- /**
- * @param e Sets write value for pessimistic transactions.
- * @return {@code True} if entry was found.
- */
- public boolean setWriteValue(IgniteTxEntry e);
+ public void doneRemote(GridCacheVersion baseVer,
+ Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers,
+ Collection<GridCacheVersion> pendingVers);
}
\ No newline at end of file