You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/11/02 11:14:09 UTC
[ignite] branch master updated: IGNITE-15715 Missed values should
be checked by Read Repair (#9505)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4a4195e IGNITE-15715 Missed values should be checked by Read Repair (#9505)
4a4195e is described below
commit 4a4195e4e4a9c59933713fa29fad3d282028230c
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Tue Nov 2 14:13:40 2021 +0300
IGNITE-15715 Missed values should be checked by Read Repair (#9505)
---
.../dht/CacheDistributedGetFutureAdapter.java | 7 ++
.../GridNearReadRepairCheckOnlyFuture.java | 55 ++++++----
.../near/consistency/GridNearReadRepairFuture.java | 72 ++++++++-----
.../consistency/AbstractFullSetReadRepairTest.java | 22 ++--
.../cache/consistency/AbstractReadRepairTest.java | 112 ++++++++++++++++++---
.../cache/consistency/AtomicReadRepairTest.java | 20 +++-
.../ExplicitTransactionalReadRepairTest.java | 20 +++-
.../ImplicitTransactionalReadRepairTest.java | 20 +++-
8 files changed, 256 insertions(+), 72 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 6f52d98..a0b805c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -203,6 +203,13 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
}
/**
+ * @return Keys.
+ */
+ public Collection<KeyCacheObject> keys() {
+ return keys;
+ }
+
+ /**
* @param part Partition.
* @return {@code True} if partition is in owned state.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
index 8573127..48d7ce1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
@@ -17,14 +17,16 @@
package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -33,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartition
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -94,33 +95,44 @@ public class GridNearReadRepairCheckOnlyFuture extends GridNearReadRepairAbstrac
/** {@inheritDoc} */
@Override protected void reduce() {
- Map<KeyCacheObject, EntryGetResult> resMap = new HashMap<>();
-
- Map<KeyCacheObject, T2<Object, CacheEntryVersion>> prevMap = new HashMap<>();
-
+ Map<KeyCacheObject, EntryGetResult> resMap = new HashMap<>(keys.size());
Set<KeyCacheObject> inconsistentKeys = new HashSet<>();
for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
- for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fut.result().entrySet()) {
- KeyCacheObject curKey = entry.getKey();
- EntryGetResult curRes = entry.getValue();
+ for (KeyCacheObject key : fut.keys()) {
+ EntryGetResult curRes = fut.result().get(key);
- Object curVal = ctx.unwrapBinaryIfNeeded(curRes.value(), !deserializeBinary, false, null);
+ if (!resMap.containsKey(key)) {
+ resMap.put(key, curRes);
- T2<Object, CacheEntryVersion> prev = prevMap.get(curKey);
+ continue;
+ }
- if (prev != null) {
- Object prevVal = prev.get1();
- CacheEntryVersion prevVer = prev.get2();
+ EntryGetResult prevRes = resMap.get(key);
- if (prevVer.compareTo(curRes.version()) != 0 || !prevVal.equals(curVal))
- inconsistentKeys.add(curKey);
- }
- else {
- resMap.put(curKey, curRes);
+ if (curRes != null) {
+ if (prevRes == null || prevRes.version().compareTo(curRes.version()) != 0)
+ inconsistentKeys.add(key);
+ else {
+ CacheObjectAdapter curVal = curRes.value();
+ CacheObjectAdapter prevVal = prevRes.value();
- prevMap.put(curKey, new T2<>(curVal, curRes.version()));
+ try {
+ byte[] curBytes = curVal.valueBytes(ctx.cacheObjectContext());
+ byte[] prevBytes = prevVal.valueBytes(ctx.cacheObjectContext());
+
+ if (!Arrays.equals(curBytes, prevBytes))
+ inconsistentKeys.add(key);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+
+ return;
+ }
+ }
}
+ else if (prevRes != null)
+ inconsistentKeys.add(key);
}
}
@@ -137,6 +149,9 @@ public class GridNearReadRepairCheckOnlyFuture extends GridNearReadRepairAbstrac
return;
}
+ // Misses recorded to detect partial misses, but should not be propagated when the key is null at each node.
+ resMap.values().removeIf(Objects::isNull);
+
onDone(resMap);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
index 5f4dc16..1bb064e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
@@ -17,17 +17,19 @@
package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.transactions.TransactionState;
/**
@@ -73,37 +75,61 @@ public class GridNearReadRepairFuture extends GridNearReadRepairAbstractFuture {
/** {@inheritDoc} */
@Override protected void reduce() {
- Map<KeyCacheObject, T2<EntryGetResult, Object>> newestMap = new HashMap<>();
- Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>();
+ Map<KeyCacheObject, EntryGetResult> newestMap = new HashMap<>(keys.size()); // Newest entries (by version).
+ Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(); // Newest entries required to be re-committed.
for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
- for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fut.result().entrySet()) {
- KeyCacheObject key = entry.getKey();
+ for (KeyCacheObject key : fut.keys()) {
+ EntryGetResult candidateRes = fut.result().get(key);
- EntryGetResult candidateRes = entry.getValue();
+ if (!newestMap.containsKey(key)) {
+ newestMap.put(key, candidateRes);
- Object candidateVal = ctx.unwrapBinaryIfNeeded(candidateRes.value(), !deserializeBinary, false, null);
-
- newestMap.putIfAbsent(key, new T2<>(candidateRes, candidateVal));
-
- T2<EntryGetResult, Object> newest = newestMap.get(key);
-
- EntryGetResult newestRes = newest.get1();
- Object newestVal = newest.get2();
-
- int verCompareRes = newestRes.version().compareTo(candidateRes.version());
+ continue;
+ }
- if (verCompareRes < 0) {
- newestMap.put(key, new T2<>(candidateRes, candidateVal));
- fixedMap.put(key, candidateRes);
+ EntryGetResult newestRes = newestMap.get(key);
+
+ if (candidateRes != null) {
+ if (newestRes == null) { // Existing data wins.
+ newestMap.put(key, candidateRes);
+ fixedMap.put(key, candidateRes);
+ }
+ else {
+ int compareRes = candidateRes.version().compareTo(newestRes.version());
+
+ if (compareRes > 0) { // Newest data wins.
+ newestMap.put(key, candidateRes);
+ fixedMap.put(key, candidateRes);
+ }
+ else if (compareRes < 0)
+ fixedMap.put(key, newestRes);
+ else if (compareRes == 0) {
+ CacheObjectAdapter candidateVal = candidateRes.value();
+ CacheObjectAdapter newestVal = newestRes.value();
+
+ try {
+ byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
+ byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
+
+ if (!Arrays.equals(candidateBytes, newestBytes))
+ fixedMap.put(key, newestRes); // Same version, fixing values inconsistency.
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+
+ return;
+ }
+ }
+ }
}
- else if (verCompareRes > 0)
- fixedMap.put(key, newestRes);
- else if (!newestVal.equals(candidateVal)) // Same version.
- fixedMap.put(key, /*random from entries with same version*/ candidateRes); // Fixing values inconsistency.
+ else if (newestRes != null)
+ fixedMap.put(key, newestRes); // Existing data wins.
}
}
+ assert !fixedMap.containsValue(null) : "null should never be considered as a fix";
+
if (!fixedMap.isEmpty()) {
tx.finishFuture().listen(future -> {
TransactionState state = tx.state();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java
index f70b801..01c0446 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java
@@ -55,7 +55,8 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
cache.withReadRepair().getAsync(key).get() :
cache.withReadRepair().get(key);
- assertEquals(latest, res);
+ if (latest != null)
+ assertEquals(latest, res);
}
};
@@ -76,8 +77,12 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
cache.withReadRepair().getEntriesAsync(keys).get() :
cache.withReadRepair().getEntries(keys);
- for (CacheEntry<Integer, Integer> entry : res)
- assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+ for (CacheEntry<Integer, Integer> entry : res) {
+ Integer latest = data.data.get(entry.getKey()).latest;
+
+ if (latest != null)
+ assertEquals(latest, entry.getValue());
+ }
}
else {
Map<Integer, Integer> res =
@@ -85,8 +90,12 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
cache.withReadRepair().getAllAsync(keys).get() :
cache.withReadRepair().getAll(keys);
- for (Map.Entry<Integer, Integer> entry : res.entrySet())
- assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+ for (Map.Entry<Integer, Integer> entry : res.entrySet()) {
+ Integer latest = data.data.get(entry.getKey()).latest;
+
+ if (latest != null)
+ assertEquals(latest, entry.getValue());
+ }
}
};
@@ -166,7 +175,8 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
cache.getEntry(key).getValue() :
cache.get(key);
- assertEquals(latest, res);
+ if (latest != null)
+ assertEquals(latest, res);
}
};
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
index b77ce42..7ded65a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
@@ -42,14 +43,18 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Before;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -194,7 +199,8 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
assertEquals(node, primaryNode(key, DEFAULT_CACHE_NAME).cluster().localNode());
}
- assertEquals(checkFixed ? latest : null, fixedVal);
+ if (latest != null)
+ assertEquals(checkFixed ? latest : null, fixedVal);
}
assertEquals(checkFixed ? data.data.size() : 0, fixedCnt);
@@ -210,15 +216,17 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
Integer cnt,
boolean raw,
boolean async,
+ boolean misses,
+ boolean nulls,
Consumer<ReadRepairData> c)
throws Exception {
IgniteCache<Integer, Integer> cache = initiator.getOrCreateCache(DEFAULT_CACHE_NAME);
for (int i = 0; i < ThreadLocalRandom.current().nextInt(1, 10); i++) {
- Map<Integer, InconsistentMapping> results = new HashMap<>();
+ Map<Integer, InconsistentMapping> results = new TreeMap<>(); // Sorted to avoid warning.
for (int j = 0; j < cnt; j++) {
- InconsistentMapping res = setDifferentValuesForSameKey(++iterableKey);
+ InconsistentMapping res = setDifferentValuesForSameKey(++iterableKey, misses, nulls);
results.put(iterableKey, res);
}
@@ -247,14 +255,15 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
/**
*
*/
- private InconsistentMapping setDifferentValuesForSameKey(
- int key) throws Exception {
+ private InconsistentMapping setDifferentValuesForSameKey(int key, boolean misses, boolean nulls) throws Exception {
List<Ignite> nodes = new ArrayList<>();
Map<Ignite, Integer> mapping = new HashMap<>();
Ignite primary = primaryNode(key, DEFAULT_CACHE_NAME);
- if (ThreadLocalRandom.current().nextBoolean()) { // Reversed order.
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ if (rnd.nextBoolean()) { // Reversed order.
nodes.addAll(backupNodes(key, DEFAULT_CACHE_NAME));
nodes.add(primary);
}
@@ -263,14 +272,23 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
nodes.addAll(backupNodes(key, DEFAULT_CACHE_NAME));
}
- if (ThreadLocalRandom.current().nextBoolean()) // Random order.
+ if (rnd.nextBoolean()) // Random order.
Collections.shuffle(nodes);
GridCacheVersionManager mgr =
((GridCacheAdapter)(grid(1)).cachex(DEFAULT_CACHE_NAME).cache()).context().shared().versions();
- int val = 0;
- int primVal = -1;
+ int incVal = 0;
+ Integer primVal = null;
+
+ if (misses)
+ nodes = nodes.subList(0, rnd.nextInt(1, nodes.size()));
+
+ int rmvLimit = rnd.nextInt(nodes.size());
+
+ boolean incVer = rnd.nextBoolean();
+
+ GridCacheVersion ver = null;
for (Ignite node : nodes) {
IgniteInternalCache cache = ((IgniteEx)node).cachex(DEFAULT_CACHE_NAME);
@@ -279,9 +297,16 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
GridCacheEntryEx entry = adapter.entryEx(key);
+ if (ver == null || incVer)
+ ver = mgr.next(entry.context().kernalContext().discovery().topologyVersion()); // Incremental version.
+
+ boolean rmv = nulls && rnd.nextBoolean() && --rmvLimit >= 0;
+
+ Integer val = rmv ? null : ++incVal;
+
boolean init = entry.initialValue(
- new CacheObjectImpl(++val, null), // Incremental value.
- mgr.next(entry.context().kernalContext().discovery().topologyVersion()), // Incremental version.
+ new CacheObjectImpl(rmv ? -1 : val, null), // Incremental value.
+ ver,
0,
0,
false,
@@ -290,6 +315,61 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
false,
false);
+ if (rmv) {
+ if (cache.configuration().getAtomicityMode() == ATOMIC)
+ entry.innerUpdate(
+ ver,
+ ((IgniteEx)node).localNode().id(),
+ ((IgniteEx)node).localNode().id(),
+ GridCacheOperation.DELETE,
+ null,
+ null,
+ false,
+ false,
+ false,
+ false,
+ null,
+ false,
+ false,
+ false,
+ false,
+ AffinityTopologyVersion.NONE,
+ null,
+ GridDrType.DR_NONE,
+ 0,
+ 0,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null,
+ null,
+ false);
+ else
+ entry.innerRemove(
+ null,
+ ((IgniteEx)node).localNode().id(),
+ ((IgniteEx)node).localNode().id(),
+ false,
+ false,
+ false,
+ false,
+ false,
+ null,
+ AffinityTopologyVersion.NONE,
+ CU.empty0(),
+ GridDrType.DR_NONE,
+ null,
+ null,
+ null,
+ 1L);
+
+ assertFalse(entry.hasValue());
+ }
+ else
+ assertTrue(entry.hasValue());
+
assertTrue("iterableKey " + key + " already inited", init);
mapping.put(node, val);
@@ -298,11 +378,13 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
primVal = val;
}
- assertEquals(nodes.size(), new HashSet<>(mapping.values()).size()); // Each node have unique value.
+ if (!nulls)
+ assertEquals(nodes.size(), new HashSet<>(mapping.values()).size()); // Each node have unique value.
- assertTrue(primVal != -1); // Primary value set.
+ if (!misses && !nulls)
+ assertTrue(primVal != null); // Primary value set.
- return new InconsistentMapping(mapping, primVal, val);
+ return new InconsistentMapping(mapping, primVal, incVer ? incVal : null /*Any*/);
}
/**
@@ -330,7 +412,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
boolean raw,
boolean async) {
this.cache = cache;
- this.data = new HashMap<>(data);
+ this.data = data;
this.raw = raw;
this.async = async;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java
index a055be5..ac65b95 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java
@@ -88,8 +88,12 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
cache.withReadRepair().getEntriesAsync(keys).get() :
cache.withReadRepair().getEntries(keys);
- for (CacheEntry<Integer, Integer> entry : res)
- assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+ for (CacheEntry<Integer, Integer> entry : res) {
+ Integer latest = data.data.get(entry.getKey()).latest;
+
+ if (latest != null)
+ assertEquals(latest, entry.getValue());
+ }
}
else {
Map<Integer, Integer> res =
@@ -97,8 +101,12 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
cache.withReadRepair().getAllAsync(keys).get() :
cache.withReadRepair().getAll(keys);
- for (Map.Entry<Integer, Integer> entry : res.entrySet())
- assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+ for (Map.Entry<Integer, Integer> entry : res.entrySet()) {
+ Integer latest = data.data.get(entry.getKey()).latest;
+
+ if (latest != null)
+ assertEquals(latest, entry.getValue());
+ }
}
fail("Should not happen.");
@@ -166,6 +174,8 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
cnt,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
if (all)
GETALL_CHECK_AND_FAIL.accept(data);
@@ -183,6 +193,8 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
cnt,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
if (all)
CONTAINS_ALL_CHECK_AND_FAIL.accept(data);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java
index c113c91..2935bc9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java
@@ -34,7 +34,7 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepairTest {
/** Test parameters. */
- @Parameterized.Parameters(name = "concurrency={0}, isolation={1}, getEntry={2}, async={3}")
+ @Parameterized.Parameters(name = "concurrency={0}, isolation={1}, getEntry={2}, async={3}, misses={4}, nulls={5}")
public static Collection parameters() {
List<Object[]> res = new ArrayList<>();
@@ -42,7 +42,9 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
for (TransactionIsolation isolation : TransactionIsolation.values()) {
for (boolean raw : new boolean[] {false, true}) {
for (boolean async : new boolean[] {false, true})
- res.add(new Object[] {concurrency, isolation, raw, async});
+ for (boolean misses : new boolean[] {false, true})
+ for (boolean nulls : new boolean[] {false, true})
+ res.add(new Object[] {concurrency, isolation, raw, async, misses, nulls});
}
}
}
@@ -66,6 +68,14 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
@Parameterized.Parameter(3)
public boolean async;
+ /** Misses. */
+ @Parameterized.Parameter(4)
+ public boolean misses;
+
+ /** Nulls. */
+ @Parameterized.Parameter(5)
+ public boolean nulls;
+
/** {@inheritDoc} */
@Override protected void testGet(Ignite initiator, Integer cnt, boolean all) throws Exception {
prepareAndCheck(
@@ -73,6 +83,8 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
cnt,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
boolean fixByOtherTx = concurrency == TransactionConcurrency.OPTIMISTIC ||
isolation == TransactionIsolation.READ_COMMITTED;
@@ -105,6 +117,8 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
1,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
try (Transaction tx = initiator.transactions().txStart(concurrency, isolation)) {
GET_NULL.accept(data);
@@ -132,6 +146,8 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
cnt,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
// "Contains" works like optimistic() || readCommitted() and always fixed by other tx.
boolean fixByOtherTx = true;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java
index 5309cf4..a6019fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java
@@ -30,13 +30,15 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepairTest {
/** Test parameters. */
- @Parameterized.Parameters(name = "getEntry={0}, async={1}")
+ @Parameterized.Parameters(name = "getEntry={0}, async={1}, misses={2}, nulls={3}")
public static Collection parameters() {
List<Object[]> res = new ArrayList<>();
for (boolean raw : new boolean[] {false, true}) {
for (boolean async : new boolean[] {false, true})
- res.add(new Object[] {raw, async});
+ for (boolean misses : new boolean[] {false, true})
+ for (boolean nulls : new boolean[] {false, true})
+ res.add(new Object[] {raw, async, misses, nulls});
}
return res;
@@ -50,6 +52,14 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
@Parameterized.Parameter(1)
public boolean async;
+ /** Misses. */
+ @Parameterized.Parameter(2)
+ public boolean misses;
+
+ /** Nulls. */
+ @Parameterized.Parameter(3)
+ public boolean nulls;
+
/** {@inheritDoc} */
@Override protected void testGet(Ignite initiator, Integer cnt, boolean all) throws Exception {
prepareAndCheck(
@@ -57,6 +67,8 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
cnt,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
if (all)
GETALL_CHECK_AND_FIX.accept(data);
@@ -74,6 +86,8 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
1,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
GET_NULL.accept(data); // first attempt.
@@ -92,6 +106,8 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
cnt,
raw,
async,
+ misses,
+ nulls,
(ReadRepairData data) -> {
if (all)
CONTAINS_ALL_CHECK_AND_FIX.accept(data);