You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2022/03/10 13:09:20 UTC

[ignite] branch master updated: IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b0637e  IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814)
6b0637e is described below

commit 6b0637eb52c57a899274b6f6c0976520711100a3
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Thu Mar 10 16:08:46 2022 +0300

    IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814)
---
 .../testsuites/IgniteControlUtilityTestSuite.java  |   2 +
 ...GridCommandHandlerConsistencySensitiveTest.java |  28 ++++++
 .../util/GridCommandHandlerConsistencyTest.java    |  53 +++++++++++
 .../events/CacheConsistencyViolationEvent.java     |  21 ++++-
 .../cache/distributed/near/GridNearTxLocal.java    |   2 +-
 .../GridNearReadRepairAbstractFuture.java          | 103 +++++++++++++++++++--
 .../near/consistency/GridNearReadRepairFuture.java |  20 ----
 .../consistency/VisorConsistencyRepairTask.java    |  57 ++++++++----
 .../consistency/VisorConsistencyRepairTaskArg.java |  19 ----
 .../consistency/VisorConsistencyStatusTask.java    |  12 +--
 .../cache/consistency/AbstractReadRepairTest.java  |  15 ++-
 11 files changed, 251 insertions(+), 81 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index bfefe21..02153ae 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -27,6 +27,7 @@ import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
+import org.apache.ignite.util.GridCommandHandlerConsistencySensitiveTest;
 import org.apache.ignite.util.GridCommandHandlerConsistencyTest;
 import org.apache.ignite.util.GridCommandHandlerDefragmentationTest;
 import org.apache.ignite.util.GridCommandHandlerIndexForceRebuildTest;
@@ -93,6 +94,7 @@ import org.junit.runners.Suite;
 
     GridCommandHandlerConsistencyTest.class,
     GridCommandHandlerConsistencyBinaryTest.class,
+    GridCommandHandlerConsistencySensitiveTest.class,
 
     SystemViewCommandTest.class,
     MetricCommandTest.class,
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java
new file mode 100644
index 0000000..64e212c
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+/**
+ *
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "false")
+public class GridCommandHandlerConsistencySensitiveTest extends GridCommandHandlerConsistencyTest {
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
index f3c8020..7465c33 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
@@ -37,7 +37,11 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.consistency.VisorConsistencyStatusTask;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -48,6 +52,7 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK
 import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
 import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND;
 import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.LogListener.matches;
 
 /**
  *
@@ -67,6 +72,9 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
     private static final int PARTITIONS = 32;
 
     /** */
+    protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+    /** */
     @Parameterized.Parameters(name = "strategy={0}")
     public static Iterable<Object[]> data() {
         List<Object[]> res = new ArrayList<>();
@@ -103,6 +111,8 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
 
         cfg.setDataStorageConfiguration(null);
 
+        cfg.setGridLogger(listeningLog);
+
         return cfg;
     }
 
@@ -138,6 +148,27 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
 
         injectTestSystemOut();
 
+        LogListener lsnrUnmaskedKey = matches("Key: 0 (cache: ").build(); // Unmasked key.
+        LogListener lsnrMaskedKey = matches("Key: [HIDDEN_KEY#").build(); // Masked key.
+        LogListener lsnrMaskedVal = matches("Value: [HIDDEN_VALUE#").build(); // Masked value.
+
+        listeningLog.registerListener(lsnrUnmaskedKey);
+        listeningLog.registerListener(lsnrMaskedKey);
+        listeningLog.registerListener(lsnrMaskedVal);
+
+        List<LogListener> listeners = new ArrayList<>();
+
+        // It's unable to check just "Key:" count while https://issues.apache.org/jira/browse/IGNITE-15316 not fixed
+        if (S.includeSensitive()) {
+            for (int i = 0; i < PARTITIONS; i++) {
+                LogListener keyListener = matches("Key: " + i + " (cache: ").build();
+
+                listeningLog.registerListener(keyListener);
+
+                listeners.add(keyListener);
+            }
+        }
+
         assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
         assertContains(log, testOut.toString(),
             "conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts.get());
@@ -146,13 +177,30 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
 
         readRepair(brokenParts, txCacheName, fixesPerEntry);
 
+        if (S.includeSensitive()) {
+            for (LogListener listener : listeners) {
+                assertTrue(listener.check());
+
+                listener.reset();
+            }
+        }
+
         if (fixesPerEntry != null && fixesPerEntry > 0)
             assertEquals(PARTITIONS, brokenParts.get()); // Half fixed.
 
         readRepair(brokenParts, atomicCacheName, fixesPerEntry != null ? 0 : null);
 
+        if (S.includeSensitive()) {
+            for (LogListener listener : listeners)
+                assertTrue(listener.check());
+        }
+
         if (fixesPerEntry != null && fixesPerEntry > 0)
             assertEquals(PARTITIONS, brokenParts.get()); // Atomics still broken.
+
+        assertEquals(S.includeSensitive(), lsnrUnmaskedKey.check());
+        assertEquals(S.includeSensitive(), !lsnrMaskedKey.check());
+        assertEquals(S.includeSensitive(), !lsnrMaskedVal.check());
     }
 
     /**
@@ -212,6 +260,8 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
                     ConsistencyCommand.PARTITION, String.valueOf(i),
                     ConsistencyCommand.STRATEGY, strategy.toString()));
 
+            assertTrue(VisorConsistencyStatusTask.MAP.isEmpty());
+
             assertContains(log, testOut.toString(), "Cache not found");
         }
     }
@@ -225,6 +275,9 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
                 ConsistencyCommand.CACHE, cacheName,
                 ConsistencyCommand.PARTITION, String.valueOf(i),
                 ConsistencyCommand.STRATEGY, strategy.toString()));
+
+            assertTrue(VisorConsistencyStatusTask.MAP.isEmpty());
+
             assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
             assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
 
diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
index d02bf6e..bf348c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
@@ -68,7 +68,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
     private static final long serialVersionUID = 0L;
 
     /** Represents original values of entries.*/
-    private final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+    private final Map<Object, EntriesInfo> entries;
 
     /** Fixed entries. */
     private final Map<Object, Object> fixed;
@@ -92,7 +92,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
         String cacheName,
         ClusterNode node,
         String msg,
-        Map<Object, Map<ClusterNode, EntryInfo>> entries,
+        Map<Object, EntriesInfo> entries,
         Map<Object, Object> fixed,
         ReadRepairStrategy strategy) {
         super(node, msg, EVT_CONSISTENCY_VIOLATION);
@@ -108,7 +108,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
      *
      * @return Collection of original entries.
      */
-    public Map<Object, Map<ClusterNode, EntryInfo>> getEntries() {
+    public Map<Object, EntriesInfo> getEntries() {
         return entries;
     }
 
@@ -140,6 +140,21 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
     }
 
     /**
+     * Inconsistent entries mapping.
+     */
+    public interface EntriesInfo {
+        /**
+         * @return Entry's mapping.
+         */
+        public Map<ClusterNode, EntryInfo> getMapping();
+
+        /**
+         * @return Entry's partition.
+         */
+        public int partition();
+    }
+
+    /**
      * Inconsistent entry info.
      */
     public interface EntryInfo {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index cbea2b4..3639a04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -2500,7 +2500,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                                         keepCacheObjects,
                                                         deserializeBinary,
                                                         false,
-                                                        getRes,
+                                                        null,
                                                         getRes.version(),
                                                         0,
                                                         0,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
index c0cdb3d..fef2f04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
@@ -25,9 +25,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.cache.ReadRepairStrategy;
 import org.apache.ignite.cluster.ClusterNode;
@@ -45,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -335,7 +338,12 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
         if (!evtMgr.isRecordable(EVT_CONSISTENCY_VIOLATION))
             return;
 
-        Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entries = new HashMap<>();
+        boolean includeSensitive = S.includeSensitive();
+
+        Map<KeyCacheObject, Object> sensitiveKeyMap = new HashMap<>();
+        Map<ByteArrayWrapper, Object> sensitiveValMap = new HashMap<>();
+
+        Map<Object, CacheConsistencyViolationEvent.EntriesInfo> entries = new HashMap<>();
 
         for (Map.Entry<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> pair : futs.entrySet()) {
             ClusterNode node = pair.getKey();
@@ -344,21 +352,24 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
 
             for (KeyCacheObject key : fut.keys()) {
                 if (inconsistentKeys.contains(key)) {
-                    Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map =
-                        entries.computeIfAbsent(
-                            ctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false, null), k -> new HashMap<>());
+                    sensitiveKeyMap.computeIfAbsent(key, k -> includeSensitive
+                        ? ctx.unwrapBinaryIfNeeded(k, !deserializeBinary, false, null)
+                        : "[HIDDEN_KEY#" + UUID.randomUUID() + "]");
+
+                    CacheConsistencyViolationEvent.EntriesInfo entriesInfo =
+                        entries.computeIfAbsent(sensitiveKeyMap.get(key), k -> new EventEntriesInfo(key.partition()));
 
                     EntryGetResult res = fut.result().get(key);
                     CacheEntryVersion ver = res != null ? res.version() : null;
 
-                    Object val = res != null ? ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null) : null;
+                    Object val = sensitiveValue(includeSensitive, res, sensitiveValMap);
 
                     boolean primary = primaries.get(key).equals(fut.affNode());
                     boolean correct = fixedEntries != null &&
                         ((fixedEntries.get(key) != null && fixedEntries.get(key).equals(res)) ||
                             (fixedEntries.get(key) == null && res == null));
 
-                    map.put(node, new EventEntryInfo(val, ver, primary, correct));
+                    entriesInfo.getMapping().put(node, new EventEntryInfo(val, ver, primary, correct));
                 }
             }
         }
@@ -371,9 +382,8 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
             fixed = new HashMap<>();
 
             for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fixedEntries.entrySet()) {
-                Object key = ctx.unwrapBinaryIfNeeded(entry.getKey(), !deserializeBinary, false, null);
-                Object val = entry.getValue() != null ?
-                    ctx.unwrapBinaryIfNeeded(entry.getValue().value(), !deserializeBinary, false, null) : null;
+                Object key = sensitiveKeyMap.get(entry.getKey());
+                Object val = sensitiveValue(includeSensitive, entry.getValue(), sensitiveValMap);
 
                 fixed.put(key, val);
             }
@@ -391,6 +401,58 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
     /**
      *
      */
+    private Object sensitiveValue(boolean includeSensitive, EntryGetResult res,
+        Map<ByteArrayWrapper, Object> sensitiveValMap) {
+        if (res != null) {
+            CacheObject val = res.value();
+
+            try {
+                ByteArrayWrapper wrapped = new ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext()));
+
+                return sensitiveValMap.computeIfAbsent(wrapped, w ->
+                    includeSensitive ?
+                        ctx.unwrapBinaryIfNeeded(val, !deserializeBinary, false, null) :
+                        "[HIDDEN_VALUE#" + UUID.randomUUID() + "]");
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException("Failed to unmarshall object.", e);
+            }
+        }
+        else
+            return null;
+    }
+
+    /**
+     *
+     */
+    private static final class EventEntriesInfo implements CacheConsistencyViolationEvent.EntriesInfo {
+        /** Mapping. */
+        final Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping = new HashMap<>();
+
+        /** Partition. */
+        final int partition;
+
+        /**
+         * @param partition Partition.
+         */
+        public EventEntriesInfo(int partition) {
+            this.partition = partition;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> getMapping() {
+            return mapping;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition() {
+            return partition;
+        }
+    }
+
+    /**
+     *
+     */
     private static final class EventEntryInfo implements CacheConsistencyViolationEvent.EntryInfo {
         /** Value. */
         final Object val;
@@ -440,4 +502,27 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
             return correct;
         }
     }
+
+    /**
+     *
+     */
+    protected static final class ByteArrayWrapper {
+        /** Array. */
+        final byte[] arr;
+
+        /** */
+        public ByteArrayWrapper(byte[] arr) {
+            this.arr = arr;
+        }
+
+        /** */
+        @Override public boolean equals(Object o) {
+            return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+        }
+
+        /** */
+        @Override public int hashCode() {
+            return Arrays.hashCode(arr);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
index 2f6c15b..f87e4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
@@ -240,26 +240,6 @@ public class GridNearReadRepairFuture extends GridNearReadRepairAbstractFuture {
      */
     public Map<KeyCacheObject, EntryGetResult> fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
         throws IgniteCheckedException {
-        /** */
-        class ByteArrayWrapper {
-            final byte[] arr;
-
-            /** */
-            public ByteArrayWrapper(byte[] arr) {
-                this.arr = arr;
-            }
-
-            /** */
-            @Override public boolean equals(Object o) {
-                return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
-            }
-
-            /** */
-            @Override public int hashCode() {
-                return Arrays.hashCode(arr);
-            }
-        }
-
         Set<KeyCacheObject> irreparableSet = new HashSet<>(inconsistentKeys.size());
         Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
index 0948c30..7914da1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
@@ -117,7 +117,17 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
             log.info("Consistency check started " +
                 "[grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + ", strategy=" + strategy + "]");
 
-            VisorConsistencyStatusTask.MAP.put(arg, "0/" + part.fullSize());
+            StringBuilder sb = new StringBuilder();
+
+            sb.append("[node=").append(ignite.localNode());
+            sb.append(", cacheGroup=").append(grpCtx.cacheOrGroupName());
+            sb.append(", part=").append(p).append("]");
+
+            String statusKey = sb.toString();
+
+            if (VisorConsistencyStatusTask.MAP.putIfAbsent(statusKey, "0/" + part.fullSize()) != null)
+                throw new IllegalStateException("Consistency check already started " +
+                    "[grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + "]");
 
             long cnt = 0;
             long statusTs = 0;
@@ -125,7 +135,7 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
             part.reserve();
 
             try {
-                IgnitePredicate<CacheConsistencyViolationEvent> lsnr = new CacheConsistencyViolationEventListener();
+                IgnitePredicate<CacheConsistencyViolationEvent> lsnr = new CacheConsistencyViolationEventListener(cacheName);
 
                 ignite.events().localListen(lsnr, EVT_CONSISTENCY_VIOLATION);
 
@@ -169,7 +179,7 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
                             log.info("Consistency check progress [grp=" + grpCtx.cacheOrGroupName() +
                                 ", part=" + p + ", checked=" + cnt + "/" + part.fullSize() + "]");
 
-                            VisorConsistencyStatusTask.MAP.put(arg, cnt + "/" + part.fullSize());
+                            VisorConsistencyStatusTask.MAP.put(statusKey, cnt + "/" + part.fullSize());
                         }
 
                     }
@@ -185,11 +195,11 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
             finally {
                 part.release();
 
-                VisorConsistencyStatusTask.MAP.remove(arg);
+                VisorConsistencyStatusTask.MAP.remove(statusKey);
             }
 
             if (!evts.isEmpty())
-                return processEvents(cctx, p, cnt);
+                return processEvents(p, cnt);
             else
                 return NOTHING_FOUND + " [processed=" + cnt + "]\n";
         }
@@ -197,41 +207,43 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
         /**
          *
          */
-        private String processEvents(GridCacheContext<Object, Object> cctx, int part, long cnt) {
+        private String processEvents(int part, long cnt) {
             int found = 0;
             int fixed = 0;
 
             StringBuilder sb = new StringBuilder();
 
             for (CacheConsistencyViolationEvent evt : evts) {
-                for (Map.Entry<?, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entry : evt.getEntries().entrySet()) {
+                for (Map.Entry<?, CacheConsistencyViolationEvent.EntriesInfo> entry : evt.getEntries().entrySet()) {
                     Object key = entry.getKey();
 
-                    if (cctx.affinity().partition(key) != part)
+                    if (entry.getValue().partition() != part)
                         continue; // Skipping other partitions results, which are generated by concurrent executions.
 
                     found++;
 
                     sb.append("Key: ").append(key)
                         .append(" (cache: ").append(evt.getCacheName())
-                        .append(", strategy: ").append(evt.getStrategy()).append(")").append("\n");
+                        .append(", partition: ").append(entry.getValue().partition())
+                        .append(", strategy: ").append(evt.getStrategy())
+                        .append(")").append("\n");
 
                     if (evt.getFixedEntries().containsKey(key))
                         sb.append(" Fixed: ").append(evt.getFixedEntries().get(key)).append("\n");
 
-                    for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping : entry.getValue().entrySet()) {
+                    for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping :
+                        entry.getValue().getMapping().entrySet()) {
                         ClusterNode node = mapping.getKey();
                         CacheConsistencyViolationEvent.EntryInfo info = mapping.getValue();
 
                         sb.append("  Node: ").append(node).append("\n")
                             .append("    Value: ").append(info.getValue()).append("\n")
-                            .append("    Version: ").append(info.getVersion()).append("\n");
+                            .append("    Version: ").append(info.getVersion()).append("\n")
+                            .append("    On primary: ").append(info.isPrimary()).append("\n");
 
                         if (info.getVersion() != null)
                             sb.append("    Other cluster version: ").append(info.getVersion().otherClusterVersion()).append("\n");
 
-                        sb.append("    On primary: ").append(info.isPrimary()).append("\n");
-
                         if (info.isCorrect())
                             sb.append("    Considered as a CORRECT value!").append("\n");
                     }
@@ -259,13 +271,26 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
             /** Serial version uid. */
             private static final long serialVersionUID = 0L;
 
+            /** Cache name. */
+            private final String cacheName;
+
+            /**
+             * @param name Name.
+             */
+            private CacheConsistencyViolationEventListener(String name) {
+                cacheName = name;
+            }
+
             /**
              * {@inheritDoc}
              */
-            @Override public boolean apply(CacheConsistencyViolationEvent e) {
-                assert e instanceof CacheConsistencyViolationEvent;
+            @Override public boolean apply(CacheConsistencyViolationEvent evt) {
+                assert evt instanceof CacheConsistencyViolationEvent;
+
+                if (!evt.getCacheName().equals(cacheName))
+                   return true; // Skipping other caches results, which are generated by concurrent executions.
 
-                evts.add(e);
+                evts.add(evt);
 
                 return true;
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java
index 50c8b3f..6008d3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.consistency;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.Objects;
 import org.apache.ignite.cache.ReadRepairStrategy;
 import org.apache.ignite.internal.dto.IgniteDataTransferObject;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -92,22 +91,4 @@ public class VisorConsistencyRepairTaskArg extends IgniteDataTransferObject {
     public ReadRepairStrategy strategy() {
         return strategy;
     }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        VisorConsistencyRepairTaskArg arg = (VisorConsistencyRepairTaskArg)o;
-
-        return part == arg.part && Objects.equals(cacheName, arg.cacheName);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return Objects.hash(cacheName, part);
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java
index af001e6..60f43d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java
@@ -37,7 +37,7 @@ public class VisorConsistencyStatusTask extends AbstractConsistencyTask<Void, St
     public static final String NOTHING_FOUND = "Consistency check/repair operations were NOT found.";
 
     /** Status map. */
-    public static final ConcurrentHashMap<VisorConsistencyRepairTaskArg, String> MAP = new ConcurrentHashMap<>();
+    public static final ConcurrentHashMap<String, String> MAP = new ConcurrentHashMap<>();
 
     /** {@inheritDoc} */
     @Override protected VisorJob<Void, String> job(Void arg) {
@@ -80,13 +80,9 @@ public class VisorConsistencyStatusTask extends AbstractConsistencyTask<Void, St
 
             StringBuilder sb = new StringBuilder();
 
-            for (Map.Entry<VisorConsistencyRepairTaskArg, String> entry : MAP.entrySet()) {
-                VisorConsistencyRepairTaskArg args = entry.getKey();
-                String status = entry.getValue();
-
-                sb.append("\n    Cache: ").append(args.cacheName()).append("\n")
-                    .append("    Partition: ").append(args.part()).append("\n")
-                    .append("    Status: ").append(status).append("\n");
+            for (Map.Entry<String, String> entry : MAP.entrySet()) {
+                sb.append("\n    Job: ").append(entry.getKey()).append("\n")
+                    .append("    Status: ").append(entry.getValue()).append("\n");
             }
 
             return sb.toString();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
index 72358b4..4e1dcec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
@@ -77,7 +78,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
     private static final ConcurrentLinkedDeque<CacheConsistencyViolationEvent> evtDeq = new ConcurrentLinkedDeque<>();
 
     /** Key. */
-    protected static int iterableKey;
+    private static final AtomicInteger iterableKey = new AtomicInteger();
 
     /** Backups count. */
     protected Integer backupsCount() {
@@ -140,7 +141,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
 
-        log.info("Checked " + iterableKey + " keys");
+        log.info("Checked " + iterableKey.get() + " keys");
 
         stopAllGrids();
 
@@ -204,7 +205,9 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
                 assertEquals(atomicityMode() == TRANSACTIONAL ? data.strategy : ReadRepairStrategy.CHECK_ONLY, evt.getStrategy());
 
                 // Optimistic and read committed transactions produce per key fixes.
-                evtEntries.putAll(evt.getEntries());
+                for (Map.Entry<Object, CacheConsistencyViolationEvent.EntriesInfo> entries : evt.getEntries().entrySet())
+                    evtEntries.put(entries.getKey(), entries.getValue().getMapping());
+
                 evtFixed.putAll(evt.getFixedEntries());
             }
         }
@@ -277,9 +280,11 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
 
             try {
                 for (int j = 0; j < cnt; j++) {
-                    InconsistentMapping res = setDifferentValuesForSameKey(++iterableKey, misses, nulls, strategy);
+                    int curKey = iterableKey.incrementAndGet();
+
+                    InconsistentMapping res = setDifferentValuesForSameKey(curKey, misses, nulls, strategy);
 
-                    results.put(iterableKey, res);
+                    results.put(curKey, res);
                 }
 
                 for (Ignite node : G.allGrids()) { // Check that cache filled properly.