You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/10/01 05:55:21 UTC
[11/21] ignite git commit: IGNITE-7764: MVCC: cache API support. This
closes #4725.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b8c78bd..9493510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTrackerImpl;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -72,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionProxy
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyRollbackOnlyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
@@ -191,11 +192,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
/** */
private MvccQueryTracker mvccTracker;
- /** Whether this transaction is for SQL operations or not.<p>
+ /** Whether this is Mvcc transaction or not.<p>
* {@code null} means there haven't been any calls made on this transaction, and first operation will give this
* field actual value.
*/
- private Boolean sql;
+ private Boolean mvccOp;
/**
* Empty constructor required for {@link Externalizable}.
@@ -205,7 +206,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
- * @param ctx Cache registry.
+ * @param ctx Cache registry.
* @param implicit Implicit flag.
* @param implicitSingle Implicit with one key flag.
* @param sys System flag.
@@ -214,7 +215,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
* @param isolation Isolation.
* @param timeout Timeout.
* @param storeEnabled Store enabled flag.
- * @param sql Whether this transaction was started via SQL API or not, or {@code null} if unknown.
+ * @param mvccOp Whether this transaction was started via SQL API or not, or {@code null} if unknown.
* @param txSize Transaction size.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
@@ -230,7 +231,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
TransactionIsolation isolation,
long timeout,
boolean storeEnabled,
- Boolean sql,
+ Boolean mvccOp,
int txSize,
@Nullable UUID subjId,
int taskNameHash,
@@ -257,7 +258,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl();
- this.sql = sql;
+ this.mvccOp = mvccOp;
initResult();
@@ -574,6 +575,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
) {
assert key != null;
+ if (cacheCtx.mvccEnabled())
+ return mvccPutAllAsync0(cacheCtx, Collections.singletonMap(key, val),
+ entryProcessor == null ? null : Collections.singletonMap(key, entryProcessor), invokeArgs, retval, filter);
+
try {
beforePut(cacheCtx, retval, false);
@@ -628,7 +633,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
if (log.isDebugEnabled())
log.debug("Before acquiring transaction lock for put on key: " + enlisted);
- IgniteInternalFuture<Boolean>fut = cacheCtx.cache().txLockAsync(enlisted,
+ IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted,
timeout,
this,
/*read*/entryProcessor != null, // Needed to force load from store.
@@ -696,6 +701,142 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
+ * Validate Tx mode.
+ *
+ * @param ctx Cache context.
+ * @throws IgniteCheckedException If tx mode is not supported.
+ */
+ protected void validateTxMode(GridCacheContext ctx) throws IgniteCheckedException {
+ if(!ctx.mvccEnabled() || pessimistic() && repeatableRead())
+ return;
+
+ throw new IgniteCheckedException("Only pessimistic repeatable read transactions are supported at the moment.");
+ }
+
+ /**
+ * Internal method for put and transform operations in Mvcc mode.
+ * Note: Only one of {@code map}, {@code transformMap} maps must be non-null.
+ *
+ * @param cacheCtx Context.
+ * @param map Key-value map to store.
+ * @param invokeMap Invoke map.
+ * @param invokeArgs Optional arguments for EntryProcessor.
+ * @param retval Key-transform value map to store.
+ * @param filter Filter.
+ * @return Operation future.
+ */
+ private <K, V> IgniteInternalFuture mvccPutAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable Map<? extends K, ? extends V> map,
+ @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
+ @Nullable final Object[] invokeArgs,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter
+ ) {
+ try {
+ validateTxMode(cacheCtx);
+
+ // TODO: IGNITE-9540: Fix invoke/invokeAll.
+ if(invokeMap != null)
+ MvccUtils.verifyMvccOperationSupport(cacheCtx, "invoke/invokeAll");
+
+ if (mvccSnapshot == null) {
+ MvccUtils.mvccTracker(cacheCtx, this);
+
+ assert mvccSnapshot != null;
+ }
+
+ beforePut(cacheCtx, retval, true);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ // Cached entry may be passed only from entry wrapper.
+ final Map<?, ?> map0 = map;
+ final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+
+ if (log.isDebugEnabled())
+ log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
+
+ assert map0 != null || invokeMap0 != null;
+
+ if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) {
+ if (implicit())
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return new GridFinishedFuture<>(new GridCacheReturn(true, false));
+ }
+
+ try {
+ // Set transform flag for transaction.
+ if (invokeMap != null)
+ transform = true;
+
+ Set<?> keys = map0 != null ? map0.keySet() : invokeMap0.keySet();
+
+ final Map<KeyCacheObject, CacheObject> enlisted = new HashMap<>(keys.size());
+
+ for (Object key : keys) {
+ if (isRollbackOnly())
+ return new GridFinishedFuture<>(timedOut() ? timeoutException() : rollbackException());
+
+ if (key == null) {
+ rollback();
+
+ throw new NullPointerException("Null key.");
+ }
+
+ Object val = map0 == null ? null : map0.get(key);
+ EntryProcessor entryProcessor = transform ? invokeMap.get(key) : null;
+
+ if (val == null && entryProcessor == null) {
+ setRollbackOnly();
+
+ throw new NullPointerException("Null value.");
+ }
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+ CacheObject cacheVal = cacheCtx.toCacheObject(val);
+
+ enlisted.put(cacheKey, cacheVal);
+ }
+
+ return updateAsync(cacheCtx, new UpdateSourceIterator<IgniteBiTuple<KeyCacheObject, CacheObject>>() {
+
+ private Iterator<Map.Entry<KeyCacheObject, CacheObject>> it = enlisted.entrySet().iterator();
+
+ @Override public EnlistOperation operation() {
+ return EnlistOperation.UPSERT;
+ }
+
+ @Override public boolean hasNextX() throws IgniteCheckedException {
+ return it.hasNext();
+ }
+
+ @Override public IgniteBiTuple<KeyCacheObject, CacheObject> nextX() throws IgniteCheckedException {
+ Map.Entry<KeyCacheObject, CacheObject> next = it.next();
+
+ return new IgniteBiTuple<>(next.getKey(), next.getValue());
+ }
+ }, retval, filter, remainingTime(), true);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
* Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap}
* maps must be non-null.
*
@@ -717,6 +858,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
@Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
final boolean retval
) {
+ if (cacheCtx.mvccEnabled())
+ return mvccPutAllAsync0(cacheCtx, map, invokeMap, invokeArgs, retval, null);
+
try {
beforePut(cacheCtx, retval, false);
}
@@ -1549,6 +1693,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
final boolean retval,
@Nullable final CacheEntryPredicate filter,
boolean singleRmv) {
+ if(cacheCtx.mvccEnabled())
+ return mvccRemoveAllAsync0(cacheCtx, keys, retval, filter);
+
try {
checkUpdatesAllowed(cacheCtx);
}
@@ -1558,9 +1705,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
- if (cacheCtx.mvccEnabled() && !isOperationAllowed(false))
- return txTypeMismatchFinishFuture();
-
if (retval)
needReturnValue(true);
@@ -1690,9 +1834,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
-1L);
PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) {
+ /** {@inheritDoc} */
@Override protected GridCacheReturn postLock(GridCacheReturn ret)
- throws IgniteCheckedException
- {
+ throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Acquired transaction lock for remove on keys: " + enlisted);
@@ -1769,6 +1913,93 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
+ * Internal method for remove operations in Mvcc mode.
+ *
+ * @param cacheCtx Cache context.
+ * @param keys Keys to remove.
+ * @param retval Flag indicating whether a value should be returned.
+ * @param filter Filter.
+ * @return Future for asynchronous remove.
+ */
+ @SuppressWarnings("unchecked")
+ private <K, V> IgniteInternalFuture<GridCacheReturn> mvccRemoveAllAsync0(
+ final GridCacheContext cacheCtx,
+ @Nullable final Collection<? extends K> keys,
+ final boolean retval,
+ @Nullable final CacheEntryPredicate filter
+ ) {
+ try {
+ validateTxMode(cacheCtx);
+
+ if (mvccSnapshot == null) {
+ MvccUtils.mvccTracker(cacheCtx, this);
+
+ assert mvccSnapshot != null;
+ }
+
+ beforeRemove(cacheCtx, retval, true);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ if (F.isEmpty(keys)) {
+ if (implicit()) {
+ try {
+ commit();
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+ }
+
+ return new GridFinishedFuture<>(new GridCacheReturn(localResult(), true));
+ }
+
+ init();
+
+ Set<KeyCacheObject> enlisted = new HashSet<>(keys.size());
+
+ try {
+ for (Object key : keys) {
+ if (isRollbackOnly())
+ return new GridFinishedFuture<>(timedOut() ? timeoutException() : rollbackException());
+
+ if (key == null) {
+ rollback();
+
+ throw new NullPointerException("Null key.");
+ }
+
+ KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
+
+ enlisted.add(cacheKey);
+ }
+
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ return updateAsync(cacheCtx, new UpdateSourceIterator<KeyCacheObject>() {
+
+ private Iterator<KeyCacheObject> it = enlisted.iterator();
+
+ @Override public EnlistOperation operation() {
+ return EnlistOperation.DELETE;
+ }
+
+ @Override public boolean hasNextX() throws IgniteCheckedException {
+ return it.hasNext();
+ }
+
+ @Override public KeyCacheObject nextX() throws IgniteCheckedException {
+ return it.next();
+ }
+ }, retval, filter, remainingTime(), true);
+ }
+
+ /**
* @param cctx Cache context.
* @return Mvcc snapshot for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs).
*/
@@ -1846,10 +2077,67 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
+ * Executes key-value update operation in Mvcc mode.
+ *
+ * @param cacheCtx Cache context.
+ * @param it Entries iterator.
+ * @param retval Return value flag.
+ * @param filter Filter.
+ * @param timeout Timeout.
+ * @param sequential Sequential locking flag.
+ * @return Operation future.
+ */
+ private IgniteInternalFuture<GridCacheReturn> updateAsync(GridCacheContext cacheCtx,
+ UpdateSourceIterator<?> it,
+ boolean retval,
+ @Nullable CacheEntryPredicate filter,
+ long timeout,
+ boolean sequential) {
+ try {
+ final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+ /* TODO: IGNITE-9688: 'sequential' is always true here which can slowdown bulk operations,
+ but possibly we can safely optimize this. */
+
+ GridNearTxEnlistFuture fut = new GridNearTxEnlistFuture(cacheCtx, this,
+ timeout, it, 0, sequential, filter, retval);
+
+ fut.init();
+
+ return nonInterruptable(new GridEmbeddedFuture<>(fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Boolean>() {
+ @Override public Boolean applyx(IgniteInternalFuture<GridCacheReturn> fut0) throws IgniteCheckedException {
+ fut0.get();
+
+ return true;
+ }
+ }), new PLC1<GridCacheReturn>(null) {
+ @Override protected GridCacheReturn postLock(GridCacheReturn ret) throws IgniteCheckedException {
+ GridCacheReturn futRes = fut.get();
+
+ assert futRes != null;
+
+ mvccSnapshot.incrementOperationCounter();
+
+ return new GridCacheReturn(cacheCtx, true, keepBinary, futRes.value(), futRes.success());
+ }
+ }));
+ }
+ catch (RuntimeException e) {
+ onException();
+
+ throw e;
+ }
+ }
+
+ /**
+ * Executes update query operation in Mvcc mode.
+ *
* @param fut Enlist future.
* @return Operation future.
*/
- public IgniteInternalFuture<Long> updateAsync(GridNearTxAbstractEnlistFuture fut) {
+ public IgniteInternalFuture<Long> updateAsync(GridNearTxQueryAbstractEnlistFuture fut) {
fut.init();
return nonInterruptable(new GridEmbeddedFuture<>(fut.chain(new CX1<IgniteInternalFuture<Long>, Boolean>() {
@@ -1900,36 +2188,18 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
- if (cacheCtx.mvccEnabled() && !isOperationAllowed(false))
+ try {
+ validateTxMode(cacheCtx);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture(e);
+ }
+
+ if (cacheCtx.mvccEnabled() && !isOperationAllowed(true))
return txTypeMismatchFinishFuture();
init();
- if (cacheCtx.mvccEnabled() && (optimistic() && !readCommitted()) && mvccTracker == null) {
- // TODO IGNITE-7388: support async tx rollback (e.g. on timeout).
- boolean canRemap = cctx.lockedTopologyVersion(null) == null;
-
- mvccTracker = new MvccQueryTrackerImpl(cacheCtx, canRemap);
-
- return new GridEmbeddedFuture<>(mvccTracker.requestSnapshot(topologyVersion()),
- new IgniteBiClosure<MvccSnapshot, Exception, IgniteInternalFuture<Map<K, V>>>() {
- @Override public IgniteInternalFuture<Map<K, V>> apply(MvccSnapshot snapshot, Exception e) {
- if (e != null)
- return new GridFinishedFuture<>(e);
-
- return getAllAsync(cacheCtx,
- entryTopVer,
- keys,
- deserializeBinary,
- skipVals,
- keepCacheObjects,
- skipStore,
- recovery,
- needVer);
- }
- });
- }
-
int keysCnt = keys.size();
boolean single = keysCnt == 1;
@@ -2234,8 +2504,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
* @param keepCacheObjects Keep cache objects flag.
* @param skipStore Skip store flag.
* @param recovery Recovery flag..
- * @throws IgniteCheckedException If failed.
* @return Enlisted keys.
+ * @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"RedundantTypeArguments"})
private <K, V> Collection<KeyCacheObject> enlistRead(
@@ -2568,8 +2838,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c =
new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() {
@Override public void apply(KeyCacheObject key,
- @Nullable Object val,
- @Nullable GridCacheVersion loadVer) {
+ @Nullable Object val,
+ @Nullable GridCacheVersion loadVer) {
if (log.isDebugEnabled())
log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']');
@@ -2729,7 +2999,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
- * @param cacheCtx Cache context.
+ * @param cacheCtx Cache context.
* @param readThrough Read through flag.
* @param async if {@code True}, then loading will happen in a separate thread.
* @param keys Keys.
@@ -2868,7 +3138,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
- * @param cacheCtx Cache context.
+ * @param cacheCtx Cache context.
* @param readThrough Read through flag.
* @param async if {@code True}, then loading will happen in a separate thread.
* @param keys Keys.
@@ -3498,7 +3768,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
if (log.isDebugEnabled())
log.debug("Committing near local tx: " + this);
- final NearTxFinishFuture fut, fut0 = finishFut; boolean fastFinish;
+ final NearTxFinishFuture fut;
+ final NearTxFinishFuture fut0 = finishFut;
+
+ boolean fastFinish;
if (fut0 != null || !FINISH_FUT_UPD.compareAndSet(this, null, fut = finishFuture(fastFinish = fastFinish(), true)))
return chainFinishFuture(finishFut, true, true, false);
@@ -3577,9 +3850,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
IgniteInternalFuture<?> prepFut = this.prepFut;
if (onTimeout && prepFut instanceof GridNearTxPrepareFutureAdapter && !prepFut.isDone())
- ((GridNearTxPrepareFutureAdapter) prepFut).onNearTxLocalTimeout();
+ ((GridNearTxPrepareFutureAdapter)prepFut).onNearTxLocalTimeout();
+
+ final NearTxFinishFuture fut;
+ final NearTxFinishFuture fut0 = finishFut;
- final NearTxFinishFuture fut, fut0 = finishFut; boolean fastFinish;
+ boolean fastFinish;
if (fut0 != null)
return chainFinishFuture(finishFut, false, clearThreadMap, onTimeout);
@@ -3627,9 +3903,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
- * @return Transaction commit future.
+ * Finish transaction.
+ *
* @param fast {@code True} in case of fast finish.
* @param commit {@code True} if commit.
+ * @return Transaction commit future.
*/
private NearTxFinishFuture finishFuture(boolean fast, boolean commit) {
NearTxFinishFuture fut = fast ? new GridNearTxFastFinishFuture(this, commit) :
@@ -3724,7 +4002,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
* @return {@code True} if 'fast finish' path can be used for transaction completion.
*/
private boolean fastFinish() {
- return writeMap().isEmpty()
+ return writeMap().isEmpty() && !queryEnlisted()
&& ((optimistic() && !serializable()) || readMap().isEmpty())
&& (mappings.single() || F.view(mappings.mappings(), CU.FILTER_QUERY_MAPPING).isEmpty());
}
@@ -4174,14 +4452,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
* @return {@code true} if this transaction does not have type flag set or it matches invoking operation,
* {@code false} otherwise.
*/
- public boolean isOperationAllowed(boolean sqlOp) {
- if (sql == null) {
- sql = sqlOp;
+ public boolean isOperationAllowed(boolean mvccOp) {
+ if (this.mvccOp == null) {
+ this.mvccOp = mvccOp;
return true;
}
- return sql == sqlOp;
+ return this.mvccOp == mvccOp;
}
/**
@@ -4385,17 +4663,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
/**
* @param cacheCtx Cache context.
* @param retval Return value flag.
- * @param sql SQL operation flag.
+ * @param mvccOp SQL operation flag.
* @throws IgniteCheckedException If failed.
*/
- private void beforePut(GridCacheContext cacheCtx, boolean retval, boolean sql) throws IgniteCheckedException {
- assert !sql || cacheCtx.mvccEnabled();
+ private void beforePut(GridCacheContext cacheCtx, boolean retval, boolean mvccOp) throws IgniteCheckedException {
+ assert !mvccOp || cacheCtx.mvccEnabled();
checkUpdatesAllowed(cacheCtx);
cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT);
- if (cacheCtx.mvccEnabled() && !isOperationAllowed(sql))
+ if (cacheCtx.mvccEnabled() && !isOperationAllowed(mvccOp))
throw new IgniteCheckedException(TX_TYPE_MISMATCH_ERR_MSG);
if (retval)
@@ -4408,6 +4686,28 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
/**
* @param cacheCtx Cache context.
+ * @param retval Return value flag.
+ * @param mvccOp SQL operation flag.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void beforeRemove(GridCacheContext cacheCtx, boolean retval, boolean mvccOp) throws IgniteCheckedException {
+ assert !mvccOp || cacheCtx.mvccEnabled();
+
+ checkUpdatesAllowed(cacheCtx);
+
+ cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE);
+
+ if (cacheCtx.mvccEnabled() && !isOperationAllowed(mvccOp))
+ throw new IgniteCheckedException(TX_TYPE_MISMATCH_ERR_MSG);
+
+ if (retval)
+ needReturnValue(true);
+
+ checkValid();
+ }
+
+ /**
+ * @param cacheCtx Cache context.
* @throws IgniteCheckedException If updates are not allowed.
*/
private void checkUpdatesAllowed(GridCacheContext cacheCtx) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
new file mode 100644
index 0000000..714c62d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryAbstractEnlistFuture.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ *
+ */
+public abstract class GridNearTxQueryAbstractEnlistFuture extends GridNearTxAbstractEnlistFuture<Long> {
+ /**
+ * @param cctx Cache context.
+ * @param tx Transaction.
+ * @param timeout Timeout.
+ */
+ public GridNearTxQueryAbstractEnlistFuture(
+ GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout) {
+ super(cctx, tx, timeout, CU.longReducer());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index 9a2dfa3..6d48b97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -43,9 +43,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTx
* Cache lock future.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
-public class GridNearTxQueryEnlistFuture extends GridNearTxAbstractEnlistFuture {
- /** */
- private static final long serialVersionUID = -2155104765461405820L;
+public class GridNearTxQueryEnlistFuture extends GridNearTxQueryAbstractEnlistFuture {
/** Involved cache ids. */
private final int[] cacheIds;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
index dae1e81..d628de1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistResponse.java
@@ -38,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
- *
+ * A response to {@link GridNearTxQueryEnlistRequest}.
*/
public class GridNearTxQueryEnlistResponse extends GridCacheIdMessage implements ExceptionAware {
/** */
@@ -99,6 +99,7 @@ public class GridNearTxQueryEnlistResponse extends GridCacheIdMessage implements
* @param lockVer Lock version.
* @param res Result.
* @param removeMapping Remove mapping flag.
+ * @param newDhtNodes New DHT nodes involved into transaction.
*/
public GridNearTxQueryEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer, long res,
boolean removeMapping, Set<UUID> newDhtNodes) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index 2452b92..b83339b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -62,10 +62,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
* A future tracking requests for remote nodes transaction enlisting and locking
* of entries produced with complex DML queries requiring reduce step.
*/
-public class GridNearTxQueryResultsEnlistFuture extends GridNearTxAbstractEnlistFuture {
- /** */
- private static final long serialVersionUID = 4339957209840477447L;
-
+public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractEnlistFuture {
/** */
public static final int DFLT_BATCH_SIZE = 1024;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
index 94cacfa..48c63bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistResponse.java
@@ -54,7 +54,7 @@ public class GridNearTxQueryResultsEnlistResponse extends GridNearTxQueryEnlistR
* @param res Result.
* @param dhtFutId Dht future id.
* @param dhtVer Dht version.
- * @param newDhtNodes New
+ * @param newDhtNodes New DHT nodes involved into transaction.
*/
public GridNearTxQueryResultsEnlistResponse(int cacheId,
IgniteUuid futId,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index ff1c85f..ca77bf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -1233,7 +1233,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
for (TxKey key : waitMap.keySet()) {
assert key.major() == snapshot.coordinatorVersion()
&& key.minor() > snapshot.cleanupVersion()
- || key.major() > snapshot.coordinatorVersion();
+ || key.major() > snapshot.coordinatorVersion() :
+ "key=" + key + ", snapshot=" + snapshot;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
index f46d1e0..9a767ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.NotNull;
@@ -46,7 +47,6 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
private final IgniteLogger log;
/** */
- @GridToStringExclude
private long crdVer;
/** */
@@ -259,6 +259,9 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
IgniteInternalFuture<AffinityTopologyVersion> waitFut =
cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion());
+ if(log.isDebugEnabled())
+ log.debug("Remap on new topology: " + waitFut);
+
if (waitFut == null)
requestSnapshot(cctx.shared().exchange().readyAffinityVersion(), lsnr);
else {
@@ -325,6 +328,11 @@ public class MvccQueryTrackerImpl implements MvccQueryTracker {
return true;
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MvccQueryTrackerImpl.class, this);
+ }
+
/** */
private final class ListenerDecorator implements MvccSnapshotResponseListener {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index c57a790..16c30c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSna
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -1795,13 +1796,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
GridCacheVersion ver,
long expireTime,
MvccSnapshot mvccVer,
+ CacheEntryPredicate filter,
boolean primary,
boolean needHistory,
- boolean noCreate) throws IgniteCheckedException {
+ boolean noCreate,
+ boolean retVal) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- return delegate.mvccUpdate(
- cctx, key, val, ver, expireTime, mvccVer, primary, needHistory, noCreate);
+ return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, primary,
+ needHistory, noCreate, retVal);
}
/** {@inheritDoc} */
@@ -1809,11 +1812,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
GridCacheContext cctx,
KeyCacheObject key,
MvccSnapshot mvccVer,
+ CacheEntryPredicate filter,
boolean primary,
- boolean needHistory) throws IgniteCheckedException {
+ boolean needHistory,
+ boolean retVal) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- return delegate.mvccRemove(cctx, key, mvccVer, primary, needHistory);
+ return delegate.mvccRemove(cctx, key, mvccVer, filter, primary, needHistory, retVal);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index fb6293c..d0e3dca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1720,10 +1720,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
*/
public void markQueryEnlisted(MvccSnapshot ver) {
if (!qryEnlisted) {
+ assert ver != null || mvccSnapshot != null;
+
if (mvccSnapshot == null)
mvccSnapshot = ver;
- cctx.coordinators().registerLocalTransaction(ver.coordinatorVersion(), ver.counter());
+ if(dht())
+ cctx.coordinators().registerLocalTransaction(mvccSnapshot.coordinatorVersion(), mvccSnapshot.counter());
qryEnlisted = true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 7cc3e55..438c8ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -466,7 +466,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout transaction timeout.
- * @param sql Whether this transaction is being started via SQL API or not, or {@code null} if unknown.
+ * @param mvccOp Whether this transaction is being started via SQL API or not, or {@code null} if unknown.
* @param txSize Expected transaction size.
* @param lb Label.
* @return New transaction.
@@ -479,7 +479,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
TransactionIsolation isolation,
long timeout,
boolean storeEnabled,
- Boolean sql,
+ Boolean mvccOp,
int txSize,
@Nullable String lb
) {
@@ -499,7 +499,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
isolation,
timeout,
storeEnabled,
- sql,
+ mvccOp,
txSize,
subjId,
taskNameHash,
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
index 716094e..2a0b582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccUpdateDataRow.java
@@ -21,9 +21,12 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
@@ -38,6 +41,8 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwar
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
@@ -49,6 +54,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isActiv
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isVisible;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccVersionIsValid;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException;
+import static org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType.FILTERED;
/**
*
@@ -94,6 +100,9 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
/** Whether tx has overridden it's own update. */
private static final int OWN_VALUE_OVERRIDDEN = DELETED << 1;
+ /** Force read full entry instead of header only. */
+ private static final int NEED_PREV_VALUE = OWN_VALUE_OVERRIDDEN << 1;
+
/** */
@GridToStringExclude
private final GridCacheContext cctx;
@@ -125,6 +134,10 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
/** */
private List<MvccLinkAwareSearchRow> historyRows;
+ /** */
+ @GridToStringExclude
+ private CacheEntryPredicate filter;
+
/**
* @param cctx Cache context.
* @param key Key.
@@ -148,10 +161,12 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
long expireTime,
MvccSnapshot mvccSnapshot,
MvccVersion newVer,
+ @Nullable CacheEntryPredicate filter,
boolean primary,
boolean lockOnly,
boolean needHistory,
- boolean fastUpdate) {
+ boolean fastUpdate,
+ boolean needPrevValue) {
super(key,
val,
ver,
@@ -163,6 +178,7 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
this.mvccSnapshot = mvccSnapshot;
this.cctx = cctx;
+ this.filter = filter;
this.keyAbsentBefore = primary; // True for primary and false for backup (backups do not use this flag).
assert !lockOnly || val == null;
@@ -181,6 +197,9 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
if (fastUpdate)
flags |= FAST_UPDATE;
+ if(needPrevValue)
+ flags |= NEED_PREV_VALUE;
+
setFlags(flags);
}
@@ -237,8 +256,18 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
if (removed)
setFlags(DELETED);
- else
- oldRow = row;
+ else {
+ // Actually, full row can be omitted for replace(k,newval) and putIfAbsent, but
+ // operation context is not available here and full row required if filter is set.
+ if (res == ResultType.PREV_NOT_NULL && (isFlagsSet(NEED_PREV_VALUE) || filter != null))
+ oldRow = tree.getRow(io, pageAddr, idx, RowData.FULL);
+ else
+ oldRow = row;
+ }
+
+ // TODO: IGNITE-9689: optimize filter usage here. See {@link org.apache.ignite.internal.processors.cache.CacheOperationFilter}.
+ if(filter != null && !applyFilter(res == ResultType.PREV_NOT_NULL ? oldRow.value() : null))
+ res = FILTERED;
setFlags(LAST_COMMITTED_FOUND | OWN_VALUE_OVERRIDDEN);
@@ -293,9 +322,14 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
else {
res = ResultType.PREV_NOT_NULL;
- oldRow = row;
-
keyAbsentBefore = false;
+
+ // Actually, full row can be omitted for replace(k,newval) and putIfAbsent, but
+ // operation context is not available here and full row required if filter is set.
+ if( (isFlagsSet(NEED_PREV_VALUE) || filter != null))
+ oldRow = tree.getRow(io, pageAddr, idx, RowData.FULL);
+ else
+ oldRow = row;
}
if (isFlagsSet(CHECK_VERSION)) {
@@ -337,9 +371,13 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
}
}
+ // TODO: IGNITE-9689: optimize filter usage here. See {@link org.apache.ignite.internal.processors.cache.CacheOperationFilter}.
+ if(filter != null && !applyFilter(res == ResultType.PREV_NOT_NULL ? oldRow.value() : null))
+ res = FILTERED;
+
// Lock entry for primary partition if needed.
// If invisible row is found for FAST_UPDATE case we should not lock row.
- if (isFlagsSet(PRIMARY | REMOVE_OR_LOCK) && !isFlagsSet(FAST_MISMATCH)) {
+ if (!isFlagsSet(DELETED) && isFlagsSet(PRIMARY | REMOVE_OR_LOCK) && !isFlagsSet(FAST_MISMATCH)) {
rowIo.setMvccLockCoordinatorVersion(pageAddr, idx, mvccCrd);
rowIo.setMvccLockCounter(pageAddr, idx, mvccCntr);
@@ -423,6 +461,22 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
return unsetFlags(FIRST);
}
+ /**
+ * Apply filter.
+ *
+ * @param val0 Previous value.
+ * @return Filter result.
+ */
+ private boolean applyFilter(final CacheObject val0) {
+ GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key) {
+ @Nullable @Override public CacheObject peekVisibleValue() {
+ return val0;
+ }
+ };
+
+ return filter.apply(e);
+ }
+
/** {@inheritDoc} */
@Override public int state() {
return state;
@@ -436,10 +490,26 @@ public class MvccUpdateDataRow extends MvccDataRow implements MvccUpdateResult,
}
/**
- * @return {@code True} if previous value was non-null.
+ * @return Result type.
*/
- @Override public ResultType resultType() {
- return res == null ? ResultType.PREV_NULL : res;
+ @NotNull @Override public ResultType resultType() {
+ return res == null ? defaultResult() : res;
+ }
+
+ /**
+ * Evaluate default result type.
+ *
+ * @return Result type.
+ */
+ @NotNull private ResultType defaultResult() {
+ assert res == null;
+
+ if (filter != null && !applyFilter(null))
+ res = FILTERED;
+ else
+ res = ResultType.PREV_NULL; // Default.
+
+ return res;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
index eecb4a5..16e7e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/ResultType.java
@@ -30,5 +30,7 @@ public enum ResultType {
/** */
LOCKED,
/** */
- VERSION_MISMATCH
+ VERSION_MISMATCH,
+ /** */
+ FILTERED
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 9bbf03d..1a3c8d7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -483,8 +483,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
/** {@inheritDoc} */
@Override public GridCacheUpdateTxResult mvccSet(@Nullable IgniteInternalTx tx, UUID affNodeId, CacheObject val,
long ttl0, AffinityTopologyVersion topVer, MvccSnapshot mvccVer,
- GridCacheOperation op, boolean needHistory,
- boolean noCreate) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ GridCacheOperation op, boolean needHistory, boolean noCreate, CacheEntryPredicate filter, boolean retVal)
+ throws IgniteCheckedException, GridCacheEntryRemovedException {
rawPut(val, ttl);
return new GridCacheUpdateTxResult(true);
@@ -492,7 +492,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
/** {@inheritDoc} */
@Override public GridCacheUpdateTxResult mvccRemove(@Nullable IgniteInternalTx tx, UUID affNodeId,
- AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory)
+ AffinityTopologyVersion topVer, MvccSnapshot mvccVer, boolean needHistory,
+ CacheEntryPredicate filter, boolean retVal)
throws IgniteCheckedException, GridCacheEntryRemovedException {
obsoleteVer = ver;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
index dfc8b05..6a00ea4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxIteratorSelfTest.java
@@ -27,6 +27,9 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
@@ -154,6 +157,13 @@ public class IgniteCacheTxIteratorSelfTest extends GridCommonAbstractTest {
for (TransactionIsolation iso : TransactionIsolation.values()) {
for (TransactionConcurrency con : TransactionConcurrency.values()) {
try (Transaction transaction = ignite.transactions().txStart(con, iso)) {
+ //TODO: IGNITE-7187: Fix when ticket will be implemented. (Near cache)
+ //TODO: IGNITE-7956: Fix when ticket will be implemented. (Eviction)
+ if (((IgniteCacheProxy)cache).context().mvccEnabled() &&
+ ((iso != TransactionIsolation.REPEATABLE_READ && con != TransactionConcurrency.PESSIMISTIC)
+ || nearEnabled || useEvicPlc))
+ return; // Nothing to do. Mode is not supported.
+
assertEquals(val, cache.get(key));
transaction.commit();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
index b2cbe05..c1718b5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractBasicCoordinatorFailoverTest.java
@@ -18,11 +18,11 @@
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,10 +52,8 @@ import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
* Base class for Mvcc coordinator failover test.
@@ -73,6 +71,8 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
ReadMode readMode,
WriteMode writeMode
) throws Exception {
+ assert concurrency == PESSIMISTIC && isolation == REPEATABLE_READ;
+
testSpi = true;
startGrids(3);
@@ -169,7 +169,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
final int KEYS = 100;
- final Map<Integer, Integer> vals = new HashMap<>();
+ final Map<Integer, Integer> vals = new LinkedHashMap<>();
for (int i = 0; i < KEYS; i++)
vals.put(i, 0);
@@ -298,7 +298,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
Integer val = 1;
while (!done.get()) {
- Map<Integer, Integer> vals = new HashMap<>();
+ Map<Integer, Integer> vals = new LinkedHashMap<>();
for (int i = 0; i < KEYS; i++)
vals.put(i, val);
@@ -479,9 +479,6 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
", crdChangeCnt=" + crdChangeCnt +
", readInTx=" + readInTx + ']');
- TransactionConcurrency concurrency = readMode == ReadMode.GET ? OPTIMISTIC : PESSIMISTIC; // TODO IGNITE-7184
- TransactionIsolation isolation = readMode == ReadMode.GET ? SERIALIZABLE : REPEATABLE_READ; // TODO IGNITE-7184
-
testSpi = true;
client = false;
@@ -510,7 +507,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
final IgniteCache cache = getNode.createCache(ccfg);
- final Set<Integer> keys = new HashSet<>();
+ final Set<Integer> keys = new TreeSet<>();
List<Integer> keys1 = primaryKeys(jcache(COORDS), 10);
List<Integer> keys2 = primaryKeys(jcache(COORDS + 1), 10);
@@ -518,7 +515,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
keys.addAll(keys1);
keys.addAll(keys2);
- Map<Integer, Integer> vals = new HashMap();
+ Map<Integer, Integer> vals = new LinkedHashMap();
for (Integer key : keys)
vals.put(key, -1);
@@ -544,7 +541,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
Map<Integer, Integer> res = null;
if (readInTx) {
- try (Transaction tx = getNode.transactions().txStart(concurrency, isolation)) {
+ try (Transaction tx = getNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
res = readAllByMode(cache, keys, readMode, INTEGER_CODEC);
tx.rollback();
@@ -581,7 +578,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
stopGrid(i);
for (int i = 0; i < 10; i++) {
- vals = new HashMap();
+ vals = new LinkedHashMap();
for (Integer key : keys)
vals.put(key, i);
@@ -636,7 +633,7 @@ public abstract class CacheMvccAbstractBasicCoordinatorFailoverTest extends Cach
final IgniteCache cache = client.createCache(ccfg);
- final Map<Integer, Integer> vals = new HashMap();
+ final Map<Integer, Integer> vals = new LinkedHashMap<>();
for (int i = 0; i < 100; i++)
vals.put(i, i);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
index 54e4315..60f1a2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractCoordinatorFailoverTest.java
@@ -113,27 +113,6 @@ public abstract class CacheMvccAbstractCoordinatorFailoverTest extends CacheMvcc
/**
* @throws Exception If failed.
*/
- public void testCoordinatorFailureSimpleSerializableTxPutGet() throws Exception {
- coordinatorFailureSimple(OPTIMISTIC, SERIALIZABLE, GET, PUT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testCoordinatorFailureSimpleOptimisticTxPutGet() throws Exception {
- coordinatorFailureSimple(OPTIMISTIC, REPEATABLE_READ, GET, PUT);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTxInProgressCoordinatorChangeSimple_ReadonlyPutGet() throws Exception {
- txInProgressCoordinatorChangeSimple(OPTIMISTIC, SERIALIZABLE, null, GET, PUT);
- }
-
- /**
- * @throws Exception If failed.
- */
public void testReadInProgressCoordinatorFailsSimple_FromClientPutGet() throws Exception {
readInProgressCoordinatorFailsSimple(true, null, GET, PUT);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
index fe450d1..6c6b8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractFeatureTest.java
@@ -173,7 +173,7 @@ public abstract class CacheMvccAbstractFeatureTest extends CacheMvccAbstractTest
int idx;
do {
- idx = (int) (Math.random() * 100) + 1;
+ idx = (int) (Math.random() * 100);
}
while (!keys.add(idx));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index a4962d1..c191849 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -22,10 +22,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -45,6 +48,7 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
@@ -92,11 +96,9 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM;
import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -223,18 +225,21 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ persistence = false;
+
try {
- verifyOldVersionsCleaned();
+ if(disableScheduledVacuum)
+ verifyOldVersionsCleaned();
verifyCoordinatorInternalState();
}
finally {
stopAllGrids();
- }
- MvccProcessorImpl.coordinatorAssignClosure(null);
+ MvccProcessorImpl.coordinatorAssignClosure(null);
- cleanPersistenceDir();
+ cleanPersistenceDir();
+ }
super.afterTest();
}
@@ -420,7 +425,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
Integer id1 = Math.min(i1, i2);
Integer id2 = Math.max(i1, i2);
- TreeSet<Integer> keys = new TreeSet<>();
+ Set<Integer> keys = new HashSet<>();
keys.add(id1);
keys.add(id2);
@@ -787,7 +792,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
* @param cache Cache.
* @return All accounts
*/
- private static Map<Integer, MvccTestAccount> getAllSql(TestCache<Integer, MvccTestAccount> cache) {
+ protected static Map<Integer, MvccTestAccount> getAllSql(TestCache<Integer, MvccTestAccount> cache) {
Map<Integer, MvccTestAccount> accounts = new HashMap<>();
SqlFieldsQuery qry = new SqlFieldsQuery("select _key, val, updateCnt from MvccTestAccount");
@@ -826,12 +831,28 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
* @param cache Cache.
* @param key Key.
*/
- private static void removeSql(TestCache<Integer, MvccTestAccount> cache, Integer key) {
+ protected static void removeSql(TestCache<Integer, MvccTestAccount> cache, Integer key) {
SqlFieldsQuery qry = new SqlFieldsQuery("delete from MvccTestAccount where _key=" + key);
cache.cache.query(qry).getAll();
}
+
+ /**
+ * Merge account by means of SQL API.
+ *
+ * @param cache Cache.
+ * @param key Key.
+ * @param val Value.
+ * @param updateCnt Update counter.
+ */
+ protected static void mergeSql(TestCache<Integer, MvccTestAccount> cache, Integer key, Integer val, Integer updateCnt) {
+ SqlFieldsQuery qry = new SqlFieldsQuery("merge into MvccTestAccount(_key, val, updateCnt) values " +
+ " (" + key+ ", " + val + ", " + updateCnt + ")");
+
+ cache.cache.query(qry).getAll();
+ }
+
/**
* Inserts account by means of SQL API.
*
@@ -867,9 +888,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
ReadMode readMode,
WriteMode writeMode
) throws Exception {
- if(readMode == SCAN && writeMode == PUT)
- fail("https://issues.apache.org/jira/browse/IGNITE-7764");
-
final int RANGE = 20;
final int writers = 4;
@@ -886,15 +904,23 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
info("Thread range [min=" + min + ", max=" + max + ']');
- Map<Integer, Integer> map = new HashMap<>();
+ // Sorted map for put to avoid deadlocks.
+ Map<Integer, Integer> map = new TreeMap<>();
+
+ // Unordered key sequence.
+ Set<Integer> keys = new LinkedHashSet<>();
int v = idx * 1_000_000;
boolean first = true;
while (!stop.get()) {
- while (map.size() < RANGE)
- map.put(rnd.nextInt(min, max), v);
+ while (keys.size() < RANGE) {
+ int key = rnd.nextInt(min, max);
+
+ if (keys.add(key))
+ map.put(key, v);
+ }
TestCache<Integer, Integer> cache = randomCache(caches, rnd);
@@ -903,9 +929,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
if (!first && rnd.nextBoolean()) {
- Map<Integer, Integer> res = readAllByMode(cache.cache, map.keySet(), readMode, INTEGER_CODEC);
+ Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
- for (Integer k : map.keySet())
+ for (Integer k : keys)
assertEquals("res=" + res, v - 1, (Object)res.get(k));
}
@@ -917,14 +943,12 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}
if (rnd.nextBoolean()) {
- Map<Integer, Integer> res = readAllByMode(cache.cache, map.keySet(), readMode, INTEGER_CODEC);
+ Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
- for (Integer k : map.keySet())
+ for (Integer k : keys)
assertEquals("key=" + k, v, (Object)res.get(k));
}
- map.clear();
-
v++;
}
catch (Exception e) {
@@ -933,6 +957,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
finally {
cache.readUnlock();
+ keys.clear();
+
map.clear();
}
}
@@ -956,6 +982,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
int min = range * RANGE;
int max = min + RANGE;
+ keys.clear();
+
while (keys.size() < RANGE)
keys.add(rnd.nextInt(min, max));
@@ -1003,8 +1031,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}
}
}
-
- keys.clear();
}
}
};
@@ -1054,9 +1080,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
)
throws Exception
{
- if(readMode == SCAN && writeMode == PUT)
- fail("https://issues.apache.org/jira/browse/IGNITE-7764");
-
final int TOTAL = 20;
assert N <= TOTAL;
@@ -1071,7 +1094,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
@Override public void apply(IgniteCache<Object, Object> cache) {
final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
- Map<Integer, Integer> vals = new HashMap<>();
+ Map<Integer, Integer> vals = new LinkedHashMap<>();
for (int i = 0; i < TOTAL; i++)
vals.put(i, N);
@@ -1341,6 +1364,9 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
while (System.currentTimeMillis() < stopTime && !stop.get()) {
Thread.sleep(1000);
+ if (System.currentTimeMillis() >= stopTime || stop.get())
+ break;
+
if (restartMode != null) {
switch (restartMode) {
case RESTART_CRD: {
@@ -1806,12 +1832,15 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}
});
+ Map res;
- Map res = (Map)cache.query(scanQry).getAll()
- .stream()
- .collect(Collectors.toMap(v -> ((IgniteBiTuple)v).getKey(), v -> ((IgniteBiTuple)v).getValue()));
+ try (QueryCursor qry = cache.query(scanQry)) {
+ res = (Map)qry.getAll()
+ .stream()
+ .collect(Collectors.toMap(v -> ((IgniteBiTuple)v).getKey(), v -> ((IgniteBiTuple)v).getValue()));
- assertTrue("res.size()=" + res.size() + ", keys.size()=" + keys.size(), res.size() <= keys.size());
+ assertTrue("res.size()=" + res.size() + ", keys.size()=" + keys.size(), res.size() <= keys.size());
+ }
return res;
@@ -1833,29 +1862,29 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
String qry = b.toString();
- SqlFieldsQuery sqlFieldsQry = new SqlFieldsQuery(qry);
+ SqlFieldsQuery sqlFieldsQry = new SqlFieldsQuery(qry);
if (emulateLongQry)
sqlFieldsQry.setLazy(true).setPageSize(1);
List<List> rows;
- if (emulateLongQry) {
- FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry);
-
- rows = new ArrayList<>();
+ try (FieldsQueryCursor<List> cur = cache.query(sqlFieldsQry)) {
+ if (emulateLongQry) {
+ rows = new ArrayList<>();
- for (List row : cur) {
- rows.add(row);
+ for (List row : cur) {
+ rows.add(row);
- doSleep(ThreadLocalRandom.current().nextInt(50));
+ doSleep(ThreadLocalRandom.current().nextInt(50));
+ }
}
+ else
+ rows = cur.getAll();
}
- else
- rows = cache.query(sqlFieldsQry).getAll();
if (rows.isEmpty())
- return Collections.EMPTY_MAP;
+ return Collections.emptyMap();
res = new HashMap();
@@ -1887,7 +1916,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
rows = cur.getAll();
if (rows.isEmpty())
- return Collections.EMPTY_MAP;
+ return Collections.emptyMap();
res = new HashMap();
@@ -2104,6 +2133,23 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ MvccTestAccount account = (MvccTestAccount)o;
+ return val == account.val &&
+ updateCnt == account.updateCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+
+ return Objects.hash(val, updateCnt);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return "MvccTestAccount{" +
"val=" + val +