You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/02/09 17:43:08 UTC
[08/24] ignite git commit: ignite-4465 Prevent cache entry eviction
while it is loaded from store (otherwise loaded value can be not stored in
cache).
ignite-4465 Prevent cache entry eviction while it is loaded from store (otherwise loaded value can be not stored in cache).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/22b7e76c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/22b7e76c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/22b7e76c
Branch: refs/heads/master
Commit: 22b7e76c00a77a06388bcef869f29d1a572a306a
Parents: 7117647
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jan 17 12:33:32 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jan 17 12:33:32 2017 +0300
----------------------------------------------------------------------
.../processors/cache/EntryGetResult.java | 65 +++++++
.../processors/cache/GridCacheAdapter.java | 104 ++++++-----
.../processors/cache/GridCacheEntryEx.java | 30 ++-
.../processors/cache/GridCacheMapEntry.java | 100 ++++++++--
.../dht/GridPartitionedGetFuture.java | 7 +-
.../dht/GridPartitionedSingleGetFuture.java | 7 +-
.../dht/atomic/GridDhtAtomicCache.java | 7 +-
.../dht/colocated/GridDhtColocatedCache.java | 7 +-
.../distributed/near/GridNearGetFuture.java | 13 +-
.../local/atomic/GridLocalAtomicCache.java | 7 +-
.../cache/transactions/IgniteTxHandler.java | 2 +-
.../transactions/IgniteTxLocalAdapter.java | 29 +--
.../cache/CacheConcurrentReadThroughTest.java | 184 +++++++++++++++++++
.../cache/CrossCacheTxRandomOperationsTest.java | 28 ++-
.../processors/cache/GridCacheTestEntryEx.java | 21 ++-
.../testsuites/IgniteCacheTestSuite2.java | 2 +
16 files changed, 512 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java
new file mode 100644
index 0000000..a34ddae
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryGetResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ *
+ */
+public class EntryGetResult {
+ /** */
+ private final CacheObject val;
+
+ /** */
+ private final GridCacheVersion ver;
+
+ /** */
+ private final boolean reserved;
+
+ /**
+ * @param val Value.
+ * @param ver Version.
+ */
+ EntryGetResult(CacheObject val, GridCacheVersion ver, boolean reserved) {
+ this.val = val;
+ this.ver = ver;
+ this.reserved = reserved;
+ }
+
+ /**
+ * @return Value.
+ */
+ public CacheObject value() {
+ return val;
+ }
+
+ /**
+ * @return Version.
+ */
+ public GridCacheVersion version() {
+ return ver;
+ }
+
+ /**
+ * @return Reserved flag,
+ */
+ public boolean reserved() {
+ return reserved;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 965c6d1..fd9f396 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1860,7 +1860,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param needVer If {@code true} returns values as tuples containing value and version.
* @return Future.
*/
- public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
+ protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(
@Nullable final Collection<KeyCacheObject> keys,
final boolean readThrough,
boolean checkTx,
@@ -1906,7 +1906,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean needEntry = storeEnabled || ctx.isSwapOrOffheapEnabled();
- Map<KeyCacheObject, GridCacheVersion> misses = null;
+ Map<KeyCacheObject, EntryGetResult> misses = null;
for (KeyCacheObject key : keys) {
while (true) {
@@ -1920,40 +1920,58 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
try {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
- null,
- null,
- ctx.isSwapOrOffheapEnabled(),
- /*unmarshal*/true,
- /*update-metrics*/!skipVals,
- /*event*/!skipVals,
- subjId,
- null,
- taskName,
- expiry,
- !deserializeBinary);
-
- if (res == null) {
- if (storeEnabled) {
- GridCacheVersion ver = entry.version();
+ EntryGetResult res;
+ boolean evt = !skipVals;
+ boolean updateMetrics = !skipVals;
+
+ if (storeEnabled) {
+ res = entry.innerGetAndReserveForLoad(ctx.isSwapOrOffheapEnabled(),
+ updateMetrics,
+ evt,
+ subjId,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ assert res != null;
+
+ if (res.value() == null) {
if (misses == null)
misses = new HashMap<>();
- misses.put(key, ver);
+ misses.put(key, res);
+
+ res = null;
}
- else
- ctx.evicts().touch(entry, topVer);
}
else {
+ res = entry.innerGetVersioned(
+ null,
+ null,
+ ctx.isSwapOrOffheapEnabled(),
+ /*unmarshal*/true,
+ updateMetrics,
+ evt,
+ subjId,
+ null,
+ taskName,
+ expiry,
+ !deserializeBinary);
+
+ if (res == null)
+ ctx.evicts().touch(entry, topVer);
+ }
+
+ if (res != null) {
ctx.addResult(map,
key,
- res.get1(),
+ res.value(),
skipVals,
keepCacheObjects,
deserializeBinary,
true,
- needVer ? res.get2() : null);
+ needVer ? res.version() : null);
if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))
ctx.evicts().touch(entry, topVer);
@@ -1973,7 +1991,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
if (storeEnabled && misses != null) {
- final Map<KeyCacheObject, GridCacheVersion> loadKeys = misses;
+ final Map<KeyCacheObject, EntryGetResult> loadKeys = misses;
final IgniteTxLocalAdapter tx0 = tx;
@@ -1984,15 +2002,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Override public Map<K1, V1> call() throws Exception {
ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() {
@Override public void apply(KeyCacheObject key, Object val) {
- GridCacheVersion ver = loadKeys.get(key);
-
- if (ver == null) {
- if (log.isDebugEnabled())
- log.debug("Value from storage was never asked for [key=" + key +
- ", val=" + val + ']');
+ EntryGetResult res = loadKeys.get(key);
+ if (res == null || val == null)
return;
- }
loaded.add(key);
@@ -2002,14 +2015,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
GridCacheEntryEx entry = entryEx(key);
try {
- GridCacheVersion verSet = entry.versionedValue(cacheVal, ver, null);
+ GridCacheVersion verSet = entry.versionedValue(cacheVal,
+ res.version(),
+ null);
boolean set = verSet != null;
if (log.isDebugEnabled())
log.debug("Set value loaded from store into entry [" +
"set=" + set +
- ", curVer=" + ver +
+ ", curVer=" + res.version() +
", newVer=" + verSet + ", " +
"entry=" + entry + ']');
@@ -2022,7 +2037,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
keepCacheObjects,
deserializeBinary,
false,
- needVer ? set ? verSet : ver : null);
+ needVer ? set ? verSet : res.version() : null);
}
if (tx0 == null || (!tx0.implicit() &&
@@ -2045,16 +2060,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
});
if (loaded.size() != loadKeys.size()) {
- for (KeyCacheObject key : loadKeys.keySet()) {
- if (loaded.contains(key))
+ boolean needTouch =
+ tx0 == null || (!tx0.implicit() && tx0.isolation() == READ_COMMITTED);
+
+ for (Map.Entry<KeyCacheObject, EntryGetResult> e : loadKeys.entrySet()) {
+ if (loaded.contains(e.getKey()))
continue;
- if (tx0 == null || (!tx0.implicit() &&
- tx0.isolation() == READ_COMMITTED)) {
- GridCacheEntryEx entry = peekEx(key);
+ if (needTouch || e.getValue().reserved()) {
+ GridCacheEntryEx entry = peekEx(e.getKey());
- if (entry != null)
- ctx.evicts().touch(entry, topVer);
+ if (entry != null) {
+ if (e.getValue().reserved())
+ entry.clearReserveForLoad(e.getValue().version());
+
+ if (needTouch)
+ ctx.evicts().touch(entry, topVer);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index d8194fc..b1d632f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.lang.GridTuple3;
-import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;
/**
@@ -323,7 +322,7 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If loading value failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ @Nullable public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
boolean readSwap,
@@ -338,6 +337,33 @@ public interface GridCacheEntryEx {
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
+ * @param readSwap Flag indicating whether to check swap memory.
+ * @param updateMetrics If {@code true} then metrics should be updated.
+ * @param evt Flag to signal event notification.
+ * @param subjId Subject ID initiated this read.
+ * @param taskName Task name.
+ * @param expiryPlc Expiry policy.
+ * @param keepBinary Keep binary flag.
+ * @return Cached value and entry version.
+ * @throws IgniteCheckedException If loading value failed.
+ * @throws GridCacheEntryRemovedException If entry was removed.
+ * @return Cached value, entry version and flag indicating if entry was reserved.
+ */
+ public EntryGetResult innerGetAndReserveForLoad(boolean readSwap,
+ boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException;
+
+ /**
+ * @param ver Expected entry version.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void clearReserveForLoad(GridCacheVersion ver) throws IgniteCheckedException;
+
+ /**
* Reloads entry from underlying storage.
*
* @return Reloaded value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 31baeda..9f0c2b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -106,6 +106,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
private static final byte IS_SWAPPING_REQUIRED = 0x08;
/** */
+ private static final byte IS_EVICT_DISABLED = 0x10;
+
+ /** */
public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator();
/**
@@ -774,11 +777,37 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
taskName,
expirePlc,
false,
- keepBinary);
+ keepBinary,
+ false);
}
/** {@inheritDoc} */
- @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ @Override public EntryGetResult innerGetAndReserveForLoad(boolean readSwap,
+ boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ return (EntryGetResult)innerGet0(
+ /*ver*/null,
+ /*tx*/null,
+ readSwap,
+ /*readThrough*/false,
+ evt,
+ updateMetrics,
+ /*tmp*/false,
+ subjId,
+ /*transformClo*/null,
+ taskName,
+ expiryPlc,
+ true,
+ keepBinary,
+ /*reserve*/true);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
boolean readSwap,
@@ -791,7 +820,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean keepBinary)
throws IgniteCheckedException, GridCacheEntryRemovedException {
- return (T2<CacheObject, GridCacheVersion>)innerGet0(ver,
+ return (EntryGetResult)innerGet0(ver,
tx,
readSwap,
false,
@@ -803,7 +832,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
taskName,
expiryPlc,
true,
- keepBinary);
+ keepBinary,
+ false);
}
/** {@inheritDoc} */
@@ -821,16 +851,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
String taskName,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean retVer,
- boolean keepBinary
+ boolean keepBinary,
+ boolean reserveForLoad
) throws IgniteCheckedException, GridCacheEntryRemovedException {
assert !(retVer && readThrough);
+ assert !(reserveForLoad && readThrough);
// Disable read-through if there is no store.
if (readThrough && !cctx.readThrough())
readThrough = false;
- CacheObject ret;
-
GridCacheVersion startVer;
GridCacheVersion resVer = null;
@@ -838,6 +868,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean deferred = false;
GridCacheVersion ver0 = null;
+ Object res = null;
+
synchronized (this) {
checkObsolete();
@@ -881,7 +913,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
else
val = null;
- ret = val;
+ CacheObject ret = val;
if (ret == null) {
if (updateMetrics && cctx.cache().configuration().isStatisticsEnabled())
@@ -928,15 +960,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Cache version for optimistic check.
startVer = ver;
- }
- if (ret != null) {
- assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
- assert !obsolete;
- assert !deferred;
+ if (ret != null) {
+ assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
+ assert !obsolete;
+ assert !deferred;
+
+ // If return value is consistent, then done.
+ res = retVer ? new EntryGetResult(ret, resVer, false) : ret;
+ }
+ else if (reserveForLoad && !obsolete) {
+ assert !readThrough;
+ assert retVer;
+
+ boolean reserve = !evictionDisabled();
- // If return value is consistent, then done.
- return retVer ? new T2<>(ret, resVer) : ret;
+ if (reserve)
+ flags |= IS_EVICT_DISABLED;
+
+ res = new EntryGetResult(null, resVer, reserve);
+ }
}
if (obsolete) {
@@ -948,6 +991,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (deferred)
cctx.onDeferredDelete(this, ver0);
+ if (res != null)
+ return res;
+
+ CacheObject ret = null;
+
if (readThrough) {
IgniteInternalTx tx0 = null;
@@ -2926,7 +2974,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @return {@code True} if this entry should not be evicted from cache.
*/
protected boolean evictionDisabled() {
- return false;
+ return (flags & IS_EVICT_DISABLED) != 0;
}
/**
@@ -3008,6 +3056,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
value(null);
ver = newVer;
+ flags &= ~IS_EVICT_DISABLED;
if (log.isTraceEnabled()) {
log.trace("invalidate releaseSwap [key=" + key +
@@ -3096,6 +3145,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
ttlAndExpireTimeExtras(ttl, expireTime);
this.ver = ver;
+ flags &= ~IS_EVICT_DISABLED;
if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
cctx.ttl().addTrackedEntry(this);
@@ -3549,11 +3599,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
/** {@inheritDoc} */
+ @Override public synchronized void clearReserveForLoad(GridCacheVersion ver) throws IgniteCheckedException {
+ if (obsoleteVersionExtras() != null)
+ return;
+
+ if (ver.equals(this.ver)) {
+ assert evictionDisabled() : this;
+
+ flags &= ~IS_EVICT_DISABLED;
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public synchronized GridCacheVersion versionedValue(CacheObject val,
GridCacheVersion curVer,
GridCacheVersion newVer)
- throws IgniteCheckedException, GridCacheEntryRemovedException {
-
+ throws IgniteCheckedException, GridCacheEntryRemovedException
+ {
checkObsolete();
if (curVer == null || curVer.equals(ver)) {
@@ -3587,6 +3649,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
return newVer;
}
+
+ assert !evictionDisabled() : this;
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 2e22d9e..d0f209d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -449,7 +450,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
/*swap*/true,
@@ -463,8 +464,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
!deserializeBinary);
if (res != null) {
- v = res.get1();
- ver = res.get2();
+ v = res.value();
+ ver = res.version();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index aeb7eba..e188a35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -373,7 +374,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
/*swap*/true,
@@ -387,8 +388,8 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
true);
if (res != null) {
- v = res.get1();
- ver = res.get2();
+ v = res.value();
+ ver = res.version();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 940c74e..94a049e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -1493,7 +1494,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
/*swap*/true,
@@ -1507,8 +1508,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true);
if (res != null) {
- v = res.get1();
- ver = res.get2();
+ v = res.value();
+ ver = res.version();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 176a90f..56af95e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -484,7 +485,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
GridCacheVersion ver = null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
/*swap*/true,
@@ -498,8 +499,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
!deserializeBinary);
if (res != null) {
- v = res.get1();
- ver = res.get2();
+ v = res.value();
+ ver = res.version();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index b7fcbbd..ab0e88c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -437,7 +438,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
// First we peek into near cache.
if (isNear) {
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
/*swap*/true,
@@ -451,8 +452,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
!deserializeBinary);
if (res != null) {
- v = res.get1();
- ver = res.get2();
+ v = res.value();
+ ver = res.version();
}
}
else {
@@ -577,7 +578,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+ EntryGetResult res = dhtEntry.innerGetVersioned(
null,
null,
/*swap*/true,
@@ -591,8 +592,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
!deserializeBinary);
if (res != null) {
- v = res.get1();
- ver = res.get2();
+ v = res.value();
+ ver = res.version();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a419887..d1acada 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -516,7 +517,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
GridCacheVersion ver;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
null,
/*swap*/swapOrOffheap,
@@ -530,8 +531,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
!deserializeBinary);
if (res != null) {
- v = res.get1();
- ver = res.get2();
+ v = res.value();
+ ver = res.version();
ctx.addResult(
vals,
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index cf69264..f784ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1141,7 +1141,7 @@ public class IgniteTxHandler {
else
sendReply(nodeId, req, true, null);
- assert req.txState() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
+ assert req.txState() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null) : req;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index e2f8438..91c9c92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -426,7 +427,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
continue;
try {
- T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+ EntryGetResult res = entry.innerGetVersioned(
null,
this,
/*readSwap*/true,
@@ -446,7 +447,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
misses.put(key, entry.version());
}
else
- c.apply(key, skipVals ? true : res.get1(), res.get2());
+ c.apply(key, skipVals ? true : res.value(), res.version());
break;
}
@@ -1220,7 +1221,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
F.first(txEntry.entryProcessors()) : null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned(
+ EntryGetResult res = txEntry.cached().innerGetVersioned(
null,
this,
/*swap*/true,
@@ -1234,8 +1235,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txEntry.keepBinary());
if (res != null) {
- val = res.get1();
- readVer = res.get2();
+ val = res.value();
+ readVer = res.version();
}
}
else {
@@ -1303,7 +1304,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;
if (needReadVer) {
- T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+ EntryGetResult res = primaryLocal(entry) ?
entry.innerGetVersioned(
null,
this,
@@ -1318,8 +1319,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
!deserializeBinary) : null;
if (res != null) {
- val = res.get1();
- readVer = res.get2();
+ val = res.value();
+ readVer = res.version();
}
}
else {
@@ -1654,7 +1655,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
F.first(txEntry.entryProcessors()) : null;
if (needVer) {
- T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned(
+ EntryGetResult res = cached.innerGetVersioned(
null,
IgniteTxLocalAdapter.this,
/*swap*/cacheCtx.isSwapOrOffheapEnabled(),
@@ -1668,8 +1669,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txEntry.keepBinary());
if (res != null) {
- val = res.get1();
- readVer = res.get2();
+ val = res.value();
+ readVer = res.version();
}
}
else{
@@ -2377,7 +2378,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (optimistic() && !implicit()) {
try {
if (needReadVer) {
- T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ?
+ EntryGetResult res = primaryLocal(entry) ?
entry.innerGetVersioned(
null,
this,
@@ -2392,8 +2393,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
keepBinary) : null;
if (res != null) {
- old = res.get1();
- readVer = res.get2();
+ old = res.value();
+ readVer = res.version();
}
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConcurrentReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConcurrentReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConcurrentReadThroughTest.java
new file mode 100644
index 0000000..87baa49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheConcurrentReadThroughTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test was added to check fix for IGNITE-4465.
+ */
+public class CacheConcurrentReadThroughTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int SYS_THREADS = 16;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ if (!client) {
+ cfg.setPublicThreadPoolSize(SYS_THREADS);
+ cfg.setSystemThreadPoolSize(SYS_THREADS);
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConcurrentReadThrough() throws Exception {
+ startGrid(0);
+
+ client = true;
+
+ Ignite client = startGrid(1);
+
+ assertTrue(client.configuration().isClientMode());
+
+ IgniteCompute compute = client.compute().withAsync();
+
+ for (int iter = 0; iter < 10; iter++) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ final String cacheName = "test-" + iter;
+
+ ccfg.setName(cacheName);
+ ccfg.setReadThrough(true);
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+ ccfg.setStatisticsEnabled(true);
+
+ client.createCache(ccfg);
+
+ final Integer key = 1;
+
+ TestCacheStore.loadCnt.set(0);
+
+ Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < SYS_THREADS * 3; i++) {
+ compute.run(new IgniteRunnable() {
+ @IgniteInstanceResource
+ private transient Ignite ignite;
+
+ @Override public void run() {
+ assertFalse(ignite.configuration().isClientMode());
+
+ Object v = ignite.<Integer, Integer>cache(cacheName).get(key);
+
+ if (v == null)
+ throw new IgniteException("Failed to get value");
+ }
+ });
+
+ futs.add(compute.future());
+ }
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ int loadCnt = TestCacheStore.loadCnt.get();
+
+ long misses = ignite(1).cache(cacheName).metrics().getCacheMisses();
+
+ log.info("Iteration [iter=" + iter + ", loadCnt=" + loadCnt + ", misses=" + misses + ']');
+
+ assertTrue("Unexpected loadCnt: " + loadCnt, loadCnt > 0 && loadCnt <= SYS_THREADS);
+ assertTrue("Unexpected misses: " + misses, misses > 0 && misses <= SYS_THREADS);
+
+ client.destroyCache(cacheName);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<TestCacheStore> {
+ /** {@inheritDoc} */
+ @Override public TestCacheStore create() {
+ return new TestCacheStore();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCacheStore extends CacheStoreAdapter<Integer, Integer> {
+ /** */
+ private static final AtomicInteger loadCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ loadCnt.incrementAndGet();
+
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
index 67ec371..e7df3c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
@@ -86,6 +86,11 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 6 * 60 * 1000;
+ }
+
+ /** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
@@ -170,9 +175,17 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
}
/**
+ * @param cacheMode Cache mode.
+ * @param writeSync Write synchronization mode.
+ * @param fairAff Fair affinity flag.
+ * @param ignite Node to use.
+ * @param name Cache name.
*/
- protected void createCache(CacheMode cacheMode, CacheWriteSynchronizationMode writeSync, boolean fairAff,
- Ignite ignite, String name) {
+ protected void createCache(CacheMode cacheMode,
+ CacheWriteSynchronizationMode writeSync,
+ boolean fairAff,
+ Ignite ignite,
+ String name) {
ignite.createCache(cacheConfiguration(name, cacheMode, writeSync, fairAff));
}
@@ -269,9 +282,18 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
boolean checkData = fullSync && !optimistic;
+ long stopTime = System.currentTimeMillis() + 10_000;
+
for (int i = 0; i < 10_000; i++) {
- if (i % 100 == 0)
+ if (i % 100 == 0) {
+ if (System.currentTimeMillis() > stopTime) {
+ log.info("Stop on timeout, iteration: " + i);
+
+ break;
+ }
+
log.info("Iteration: " + i);
+ }
boolean rollback = i % 10 == 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 48621af..b03e9c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -418,7 +418,26 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
}
/** @inheritDoc */
- @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
+ @Override public void clearReserveForLoad(GridCacheVersion ver) {
+ assert false;
+ }
+
+ /** @inheritDoc */
+ @Override public EntryGetResult innerGetAndReserveForLoad(
+ boolean readSwap,
+ boolean updateMetrics,
+ boolean evt,
+ UUID subjId,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiryPlc,
+ boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException {
+ assert false;
+
+ return null;
+ }
+
+ /** @inheritDoc */
+ @Nullable @Override public EntryGetResult innerGetVersioned(
@Nullable GridCacheVersion ver,
IgniteInternalTx tx,
boolean readSwap,
http://git-wip-us.apache.org/repos/asf/ignite/blob/22b7e76c/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index f632f67..8792ea1 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionBackupFilterSel
import org.apache.ignite.cache.affinity.fair.FairAffinityFunctionExcludeNeighborsSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionBackupFilterSelfTest;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunctionExcludeNeighborsSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheConcurrentReadThroughTest;
import org.apache.ignite.internal.processors.cache.CacheConfigurationLeakTest;
import org.apache.ignite.internal.processors.cache.CacheDhtLocalPartitionAfterRemoveSelfTest;
import org.apache.ignite.internal.processors.cache.CacheEnumOperationsSingleNodeTest;
@@ -268,6 +269,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTest(new TestSuite(CacheExchangeMessageDuplicatedStateTest.class));
suite.addTest(new TestSuite(OffheapCacheOnClientsTest.class));
+ suite.addTest(new TestSuite(CacheConcurrentReadThroughTest.class));
return suite;
}