You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/11/02 11:14:09 UTC

[ignite] branch master updated: IGNITE-15715 Missed values should be checked by Read Repair (#9505)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a4195e  IGNITE-15715 Missed values should be checked by Read Repair (#9505)
4a4195e is described below

commit 4a4195e4e4a9c59933713fa29fad3d282028230c
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Tue Nov 2 14:13:40 2021 +0300

    IGNITE-15715 Missed values should be checked by Read Repair (#9505)
---
 .../dht/CacheDistributedGetFutureAdapter.java      |   7 ++
 .../GridNearReadRepairCheckOnlyFuture.java         |  55 ++++++----
 .../near/consistency/GridNearReadRepairFuture.java |  72 ++++++++-----
 .../consistency/AbstractFullSetReadRepairTest.java |  22 ++--
 .../cache/consistency/AbstractReadRepairTest.java  | 112 ++++++++++++++++++---
 .../cache/consistency/AtomicReadRepairTest.java    |  20 +++-
 .../ExplicitTransactionalReadRepairTest.java       |  20 +++-
 .../ImplicitTransactionalReadRepairTest.java       |  20 +++-
 8 files changed, 256 insertions(+), 72 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 6f52d98..a0b805c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -203,6 +203,13 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
     }
 
     /**
+     * @return Keys.
+     */
+    public Collection<KeyCacheObject> keys() {
+        return keys;
+    }
+
+    /**
      * @param part Partition.
      * @return {@code True} if partition is in owned state.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
index 8573127..48d7ce1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
@@ -17,14 +17,16 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
 import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
@@ -33,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartition
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 
@@ -94,33 +95,44 @@ public class GridNearReadRepairCheckOnlyFuture extends GridNearReadRepairAbstrac
 
     /** {@inheritDoc} */
     @Override protected void reduce() {
-        Map<KeyCacheObject, EntryGetResult> resMap = new HashMap<>();
-
-        Map<KeyCacheObject, T2<Object, CacheEntryVersion>> prevMap = new HashMap<>();
-
+        Map<KeyCacheObject, EntryGetResult> resMap = new HashMap<>(keys.size());
         Set<KeyCacheObject> inconsistentKeys = new HashSet<>();
 
         for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
-            for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fut.result().entrySet()) {
-                KeyCacheObject curKey = entry.getKey();
-                EntryGetResult curRes = entry.getValue();
+            for (KeyCacheObject key : fut.keys()) {
+                EntryGetResult curRes = fut.result().get(key);
 
-                Object curVal = ctx.unwrapBinaryIfNeeded(curRes.value(), !deserializeBinary, false, null);
+                if (!resMap.containsKey(key)) {
+                    resMap.put(key, curRes);
 
-                T2<Object, CacheEntryVersion> prev = prevMap.get(curKey);
+                    continue;
+                }
 
-                if (prev != null) {
-                    Object prevVal = prev.get1();
-                    CacheEntryVersion prevVer = prev.get2();
+                EntryGetResult prevRes = resMap.get(key);
 
-                    if (prevVer.compareTo(curRes.version()) != 0 || !prevVal.equals(curVal))
-                        inconsistentKeys.add(curKey);
-                }
-                else {
-                    resMap.put(curKey, curRes);
+                if (curRes != null) {
+                    if (prevRes == null || prevRes.version().compareTo(curRes.version()) != 0)
+                        inconsistentKeys.add(key);
+                    else {
+                        CacheObjectAdapter curVal = curRes.value();
+                        CacheObjectAdapter prevVal = prevRes.value();
 
-                    prevMap.put(curKey, new T2<>(curVal, curRes.version()));
+                        try {
+                            byte[] curBytes = curVal.valueBytes(ctx.cacheObjectContext());
+                            byte[] prevBytes = prevVal.valueBytes(ctx.cacheObjectContext());
+
+                            if (!Arrays.equals(curBytes, prevBytes))
+                                inconsistentKeys.add(key);
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+
+                            return;
+                        }
+                    }
                 }
+                else if (prevRes != null)
+                    inconsistentKeys.add(key);
             }
         }
 
@@ -137,6 +149,9 @@ public class GridNearReadRepairCheckOnlyFuture extends GridNearReadRepairAbstrac
             return;
         }
 
+        // Misses recorded to detect partial misses, but should not be propagated when the key is null at each node.
+        resMap.values().removeIf(Objects::isNull);
+
         onDone(resMap);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
index 5f4dc16..1bb064e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
@@ -17,17 +17,19 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near.consistency;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
 import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.transactions.TransactionState;
 
 /**
@@ -73,37 +75,61 @@ public class GridNearReadRepairFuture extends GridNearReadRepairAbstractFuture {
 
     /** {@inheritDoc} */
     @Override protected void reduce() {
-        Map<KeyCacheObject, T2<EntryGetResult, Object>> newestMap = new HashMap<>();
-        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>();
+        Map<KeyCacheObject, EntryGetResult> newestMap = new HashMap<>(keys.size()); // Newest entries (by version).
+        Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(); // Newest entries required to be re-committed.
 
         for (GridPartitionedGetFuture<KeyCacheObject, EntryGetResult> fut : futs.values()) {
-            for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fut.result().entrySet()) {
-                KeyCacheObject key = entry.getKey();
+            for (KeyCacheObject key : fut.keys()) {
+                EntryGetResult candidateRes = fut.result().get(key);
 
-                EntryGetResult candidateRes = entry.getValue();
+                if (!newestMap.containsKey(key)) {
+                    newestMap.put(key, candidateRes);
 
-                Object candidateVal = ctx.unwrapBinaryIfNeeded(candidateRes.value(), !deserializeBinary, false, null);
-
-                newestMap.putIfAbsent(key, new T2<>(candidateRes, candidateVal));
-
-                T2<EntryGetResult, Object> newest = newestMap.get(key);
-
-                EntryGetResult newestRes = newest.get1();
-                Object newestVal = newest.get2();
-
-                int verCompareRes = newestRes.version().compareTo(candidateRes.version());
+                    continue;
+                }
 
-                if (verCompareRes < 0) {
-                    newestMap.put(key, new T2<>(candidateRes, candidateVal));
-                    fixedMap.put(key, candidateRes);
+                EntryGetResult newestRes = newestMap.get(key);
+
+                if (candidateRes != null) {
+                    if (newestRes == null) { // Existing data wins.
+                        newestMap.put(key, candidateRes);
+                        fixedMap.put(key, candidateRes);
+                    }
+                    else {
+                        int compareRes = candidateRes.version().compareTo(newestRes.version());
+
+                        if (compareRes > 0) { // Newest data wins.
+                            newestMap.put(key, candidateRes);
+                            fixedMap.put(key, candidateRes);
+                        }
+                        else if (compareRes < 0)
+                            fixedMap.put(key, newestRes);
+                        else if (compareRes == 0) {
+                            CacheObjectAdapter candidateVal = candidateRes.value();
+                            CacheObjectAdapter newestVal = newestRes.value();
+
+                            try {
+                                byte[] candidateBytes = candidateVal.valueBytes(ctx.cacheObjectContext());
+                                byte[] newestBytes = newestVal.valueBytes(ctx.cacheObjectContext());
+
+                                if (!Arrays.equals(candidateBytes, newestBytes))
+                                    fixedMap.put(key, newestRes); // Same version, fixing values inconsistency.
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+
+                                return;
+                            }
+                        }
+                    }
                 }
-                else if (verCompareRes > 0)
-                    fixedMap.put(key, newestRes);
-                else if (!newestVal.equals(candidateVal)) // Same version.
-                    fixedMap.put(key, /*random from entries with same version*/ candidateRes); // Fixing values inconsistency.
+                else if (newestRes != null)
+                    fixedMap.put(key, newestRes); // Existing data wins.
             }
         }
 
+        assert !fixedMap.containsValue(null) : "null should never be considered as a fix";
+
         if (!fixedMap.isEmpty()) {
             tx.finishFuture().listen(future -> {
                 TransactionState state = tx.state();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java
index f70b801..01c0446 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractFullSetReadRepairTest.java
@@ -55,7 +55,8 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
                         cache.withReadRepair().getAsync(key).get() :
                         cache.withReadRepair().get(key);
 
-            assertEquals(latest, res);
+            if (latest != null)
+                assertEquals(latest, res);
         }
     };
 
@@ -76,8 +77,12 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
                     cache.withReadRepair().getEntriesAsync(keys).get() :
                     cache.withReadRepair().getEntries(keys);
 
-            for (CacheEntry<Integer, Integer> entry : res)
-                assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+            for (CacheEntry<Integer, Integer> entry : res) {
+                Integer latest = data.data.get(entry.getKey()).latest;
+
+                if (latest != null)
+                    assertEquals(latest, entry.getValue());
+            }
         }
         else {
             Map<Integer, Integer> res =
@@ -85,8 +90,12 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
                     cache.withReadRepair().getAllAsync(keys).get() :
                     cache.withReadRepair().getAll(keys);
 
-            for (Map.Entry<Integer, Integer> entry : res.entrySet())
-                assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+            for (Map.Entry<Integer, Integer> entry : res.entrySet()) {
+                Integer latest = data.data.get(entry.getKey()).latest;
+
+                if (latest != null)
+                    assertEquals(latest, entry.getValue());
+            }
         }
     };
 
@@ -166,7 +175,8 @@ public abstract class AbstractFullSetReadRepairTest extends AbstractReadRepairTe
                 cache.getEntry(key).getValue() :
                 cache.get(key);
 
-            assertEquals(latest, res);
+            if (latest != null)
+                assertEquals(latest, res);
         }
     };
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
index b77ce42..7ded65a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
@@ -42,14 +43,18 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Before;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -194,7 +199,8 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
                         assertEquals(node, primaryNode(key, DEFAULT_CACHE_NAME).cluster().localNode());
                 }
 
-            assertEquals(checkFixed ? latest : null, fixedVal);
+            if (latest != null)
+                assertEquals(checkFixed ? latest : null, fixedVal);
         }
 
         assertEquals(checkFixed ? data.data.size() : 0, fixedCnt);
@@ -210,15 +216,17 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
         Integer cnt,
         boolean raw,
         boolean async,
+        boolean misses,
+        boolean nulls,
         Consumer<ReadRepairData> c)
         throws Exception {
         IgniteCache<Integer, Integer> cache = initiator.getOrCreateCache(DEFAULT_CACHE_NAME);
 
         for (int i = 0; i < ThreadLocalRandom.current().nextInt(1, 10); i++) {
-            Map<Integer, InconsistentMapping> results = new HashMap<>();
+            Map<Integer, InconsistentMapping> results = new TreeMap<>(); // Sorted to avoid warning.
 
             for (int j = 0; j < cnt; j++) {
-                InconsistentMapping res = setDifferentValuesForSameKey(++iterableKey);
+                InconsistentMapping res = setDifferentValuesForSameKey(++iterableKey, misses, nulls);
 
                 results.put(iterableKey, res);
             }
@@ -247,14 +255,15 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
     /**
      *
      */
-    private InconsistentMapping setDifferentValuesForSameKey(
-        int key) throws Exception {
+    private InconsistentMapping setDifferentValuesForSameKey(int key, boolean misses, boolean nulls) throws Exception {
         List<Ignite> nodes = new ArrayList<>();
         Map<Ignite, Integer> mapping = new HashMap<>();
 
         Ignite primary = primaryNode(key, DEFAULT_CACHE_NAME);
 
-        if (ThreadLocalRandom.current().nextBoolean()) { // Reversed order.
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        if (rnd.nextBoolean()) { // Reversed order.
             nodes.addAll(backupNodes(key, DEFAULT_CACHE_NAME));
             nodes.add(primary);
         }
@@ -263,14 +272,23 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
             nodes.addAll(backupNodes(key, DEFAULT_CACHE_NAME));
         }
 
-        if (ThreadLocalRandom.current().nextBoolean()) // Random order.
+        if (rnd.nextBoolean()) // Random order.
             Collections.shuffle(nodes);
 
         GridCacheVersionManager mgr =
             ((GridCacheAdapter)(grid(1)).cachex(DEFAULT_CACHE_NAME).cache()).context().shared().versions();
 
-        int val = 0;
-        int primVal = -1;
+        int incVal = 0;
+        Integer primVal = null;
+
+        if (misses)
+            nodes = nodes.subList(0, rnd.nextInt(1, nodes.size()));
+
+        int rmvLimit = rnd.nextInt(nodes.size());
+
+        boolean incVer = rnd.nextBoolean();
+
+        GridCacheVersion ver = null;
 
         for (Ignite node : nodes) {
             IgniteInternalCache cache = ((IgniteEx)node).cachex(DEFAULT_CACHE_NAME);
@@ -279,9 +297,16 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
 
             GridCacheEntryEx entry = adapter.entryEx(key);
 
+            if (ver == null || incVer)
+                ver = mgr.next(entry.context().kernalContext().discovery().topologyVersion()); // Incremental version.
+
+            boolean rmv = nulls && rnd.nextBoolean() && --rmvLimit >= 0;
+
+            Integer val = rmv ? null : ++incVal;
+
             boolean init = entry.initialValue(
-                new CacheObjectImpl(++val, null), // Incremental value.
-                mgr.next(entry.context().kernalContext().discovery().topologyVersion()), // Incremental version.
+                new CacheObjectImpl(rmv ? -1 : val, null), // Incremental value.
+                ver,
                 0,
                 0,
                 false,
@@ -290,6 +315,61 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
                 false,
                 false);
 
+            if (rmv) {
+                if (cache.configuration().getAtomicityMode() == ATOMIC)
+                    entry.innerUpdate(
+                        ver,
+                        ((IgniteEx)node).localNode().id(),
+                        ((IgniteEx)node).localNode().id(),
+                        GridCacheOperation.DELETE,
+                        null,
+                        null,
+                        false,
+                        false,
+                        false,
+                        false,
+                        null,
+                        false,
+                        false,
+                        false,
+                        false,
+                        AffinityTopologyVersion.NONE,
+                        null,
+                        GridDrType.DR_NONE,
+                        0,
+                        0,
+                        null,
+                        false,
+                        false,
+                        null,
+                        null,
+                        null,
+                        null,
+                        false);
+                else
+                    entry.innerRemove(
+                        null,
+                        ((IgniteEx)node).localNode().id(),
+                        ((IgniteEx)node).localNode().id(),
+                        false,
+                        false,
+                        false,
+                        false,
+                        false,
+                        null,
+                        AffinityTopologyVersion.NONE,
+                        CU.empty0(),
+                        GridDrType.DR_NONE,
+                        null,
+                        null,
+                        null,
+                        1L);
+
+                assertFalse(entry.hasValue());
+            }
+            else
+                assertTrue(entry.hasValue());
+
             assertTrue("iterableKey " + key + " already inited", init);
 
             mapping.put(node, val);
@@ -298,11 +378,13 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
                 primVal = val;
         }
 
-        assertEquals(nodes.size(), new HashSet<>(mapping.values()).size()); // Each node have unique value.
+        if (!nulls)
+            assertEquals(nodes.size(), new HashSet<>(mapping.values()).size()); // Each node have unique value.
 
-        assertTrue(primVal != -1); // Primary value set.
+        if (!misses && !nulls)
+            assertTrue(primVal != null); // Primary value set.
 
-        return new InconsistentMapping(mapping, primVal, val);
+        return new InconsistentMapping(mapping, primVal, incVer ? incVal : null /*Any*/);
     }
 
     /**
@@ -330,7 +412,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
             boolean raw,
             boolean async) {
             this.cache = cache;
-            this.data = new HashMap<>(data);
+            this.data = data;
             this.raw = raw;
             this.async = async;
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java
index a055be5..ac65b95 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AtomicReadRepairTest.java
@@ -88,8 +88,12 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
                         cache.withReadRepair().getEntriesAsync(keys).get() :
                         cache.withReadRepair().getEntries(keys);
 
-                for (CacheEntry<Integer, Integer> entry : res)
-                    assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+                for (CacheEntry<Integer, Integer> entry : res) {
+                    Integer latest = data.data.get(entry.getKey()).latest;
+
+                    if (latest != null)
+                        assertEquals(latest, entry.getValue());
+                }
             }
             else {
                 Map<Integer, Integer> res =
@@ -97,8 +101,12 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
                         cache.withReadRepair().getAllAsync(keys).get() :
                         cache.withReadRepair().getAll(keys);
 
-                for (Map.Entry<Integer, Integer> entry : res.entrySet())
-                    assertEquals(data.data.get(entry.getKey()).latest, entry.getValue());
+                for (Map.Entry<Integer, Integer> entry : res.entrySet()) {
+                    Integer latest = data.data.get(entry.getKey()).latest;
+
+                    if (latest != null)
+                        assertEquals(latest, entry.getValue());
+                }
             }
 
             fail("Should not happen.");
@@ -166,6 +174,8 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
             cnt,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 if (all)
                     GETALL_CHECK_AND_FAIL.accept(data);
@@ -183,6 +193,8 @@ public class AtomicReadRepairTest extends ImplicitTransactionalReadRepairTest {
             cnt,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 if (all)
                     CONTAINS_ALL_CHECK_AND_FAIL.accept(data);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java
index c113c91..2935bc9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ExplicitTransactionalReadRepairTest.java
@@ -34,7 +34,7 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepairTest {
     /** Test parameters. */
-    @Parameterized.Parameters(name = "concurrency={0}, isolation={1}, getEntry={2}, async={3}")
+    @Parameterized.Parameters(name = "concurrency={0}, isolation={1}, getEntry={2}, async={3}, misses={4}, nulls={5}")
     public static Collection parameters() {
         List<Object[]> res = new ArrayList<>();
 
@@ -42,7 +42,9 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
             for (TransactionIsolation isolation : TransactionIsolation.values()) {
                 for (boolean raw : new boolean[] {false, true}) {
                     for (boolean async : new boolean[] {false, true})
-                        res.add(new Object[] {concurrency, isolation, raw, async});
+                        for (boolean misses : new boolean[] {false, true})
+                            for (boolean nulls : new boolean[] {false, true})
+                                res.add(new Object[] {concurrency, isolation, raw, async, misses, nulls});
                 }
             }
         }
@@ -66,6 +68,14 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
     @Parameterized.Parameter(3)
     public boolean async;
 
+    /** Misses. */
+    @Parameterized.Parameter(4)
+    public boolean misses;
+
+    /** Nulls. */
+    @Parameterized.Parameter(5)
+    public boolean nulls;
+
     /** {@inheritDoc} */
     @Override protected void testGet(Ignite initiator, Integer cnt, boolean all) throws Exception {
         prepareAndCheck(
@@ -73,6 +83,8 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
             cnt,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 boolean fixByOtherTx = concurrency == TransactionConcurrency.OPTIMISTIC ||
                     isolation == TransactionIsolation.READ_COMMITTED;
@@ -105,6 +117,8 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
             1,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 try (Transaction tx = initiator.transactions().txStart(concurrency, isolation)) {
                     GET_NULL.accept(data);
@@ -132,6 +146,8 @@ public class ExplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
             cnt,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 // "Contains" works like optimistic() || readCommitted() and always fixed by other tx.
                 boolean fixByOtherTx = true;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java
index 5309cf4..a6019fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/ImplicitTransactionalReadRepairTest.java
@@ -30,13 +30,15 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepairTest {
     /** Test parameters. */
-    @Parameterized.Parameters(name = "getEntry={0}, async={1}")
+    @Parameterized.Parameters(name = "getEntry={0}, async={1}, misses={2}, nulls={3}")
     public static Collection parameters() {
         List<Object[]> res = new ArrayList<>();
 
         for (boolean raw : new boolean[] {false, true}) {
             for (boolean async : new boolean[] {false, true})
-                res.add(new Object[] {raw, async});
+                for (boolean misses : new boolean[] {false, true})
+                    for (boolean nulls : new boolean[] {false, true})
+                        res.add(new Object[] {raw, async, misses, nulls});
         }
 
         return res;
@@ -50,6 +52,14 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
     @Parameterized.Parameter(1)
     public boolean async;
 
+    /** Misses. */
+    @Parameterized.Parameter(2)
+    public boolean misses;
+
+    /** Nulls. */
+    @Parameterized.Parameter(3)
+    public boolean nulls;
+
     /** {@inheritDoc} */
     @Override protected void testGet(Ignite initiator, Integer cnt, boolean all) throws Exception {
         prepareAndCheck(
@@ -57,6 +67,8 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
             cnt,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 if (all)
                     GETALL_CHECK_AND_FIX.accept(data);
@@ -74,6 +86,8 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
             1,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 GET_NULL.accept(data); // first attempt.
 
@@ -92,6 +106,8 @@ public class ImplicitTransactionalReadRepairTest extends AbstractFullSetReadRepa
             cnt,
             raw,
             async,
+            misses,
+            nulls,
             (ReadRepairData data) -> {
                 if (all)
                     CONTAINS_ALL_CHECK_AND_FIX.accept(data);