You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2022/03/10 13:09:20 UTC
[ignite] branch master updated: IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 6b0637e IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814)
6b0637e is described below
commit 6b0637eb52c57a899274b6f6c0976520711100a3
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Thu Mar 10 16:08:46 2022 +0300
IGNITE-16499 Сonsistency check command should support IGNITE_TO_STRING_INCLUDE_SENSITIVE option (#9814)
---
.../testsuites/IgniteControlUtilityTestSuite.java | 2 +
...GridCommandHandlerConsistencySensitiveTest.java | 28 ++++++
.../util/GridCommandHandlerConsistencyTest.java | 53 +++++++++++
.../events/CacheConsistencyViolationEvent.java | 21 ++++-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../GridNearReadRepairAbstractFuture.java | 103 +++++++++++++++++++--
.../near/consistency/GridNearReadRepairFuture.java | 20 ----
.../consistency/VisorConsistencyRepairTask.java | 57 ++++++++----
.../consistency/VisorConsistencyRepairTaskArg.java | 19 ----
.../consistency/VisorConsistencyStatusTask.java | 12 +--
.../cache/consistency/AbstractReadRepairTest.java | 15 ++-
11 files changed, 251 insertions(+), 81 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
index bfefe21..02153ae 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java
@@ -27,6 +27,7 @@ import org.apache.ignite.util.GridCommandHandlerCheckIndexesInlineSizeTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest;
+import org.apache.ignite.util.GridCommandHandlerConsistencySensitiveTest;
import org.apache.ignite.util.GridCommandHandlerConsistencyTest;
import org.apache.ignite.util.GridCommandHandlerDefragmentationTest;
import org.apache.ignite.util.GridCommandHandlerIndexForceRebuildTest;
@@ -93,6 +94,7 @@ import org.junit.runners.Suite;
GridCommandHandlerConsistencyTest.class,
GridCommandHandlerConsistencyBinaryTest.class,
+ GridCommandHandlerConsistencySensitiveTest.class,
SystemViewCommandTest.class,
MetricCommandTest.class,
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java
new file mode 100644
index 0000000..64e212c
--- /dev/null
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencySensitiveTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.util;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+/**
+ *
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_TO_STRING_INCLUDE_SENSITIVE, value = "false")
+public class GridCommandHandlerConsistencySensitiveTest extends GridCommandHandlerConsistencyTest {
+}
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
index f3c8020..7465c33 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
@@ -37,7 +37,11 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.consistency.VisorConsistencyStatusTask;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -48,6 +52,7 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.LogListener.matches;
/**
*
@@ -67,6 +72,9 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
private static final int PARTITIONS = 32;
/** */
+ protected final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
+
+ /** */
@Parameterized.Parameters(name = "strategy={0}")
public static Iterable<Object[]> data() {
List<Object[]> res = new ArrayList<>();
@@ -103,6 +111,8 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
cfg.setDataStorageConfiguration(null);
+ cfg.setGridLogger(listeningLog);
+
return cfg;
}
@@ -138,6 +148,27 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
injectTestSystemOut();
+ LogListener lsnrUnmaskedKey = matches("Key: 0 (cache: ").build(); // Unmasked key.
+ LogListener lsnrMaskedKey = matches("Key: [HIDDEN_KEY#").build(); // Masked key.
+ LogListener lsnrMaskedVal = matches("Value: [HIDDEN_VALUE#").build(); // Masked value.
+
+ listeningLog.registerListener(lsnrUnmaskedKey);
+ listeningLog.registerListener(lsnrMaskedKey);
+ listeningLog.registerListener(lsnrMaskedVal);
+
+ List<LogListener> listeners = new ArrayList<>();
+
+ // It's unable to check just "Key:" count while https://issues.apache.org/jira/browse/IGNITE-15316 not fixed
+ if (S.includeSensitive()) {
+ for (int i = 0; i < PARTITIONS; i++) {
+ LogListener keyListener = matches("Key: " + i + " (cache: ").build();
+
+ listeningLog.registerListener(keyListener);
+
+ listeners.add(keyListener);
+ }
+ }
+
assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
assertContains(log, testOut.toString(),
"conflict partitions has been found: [counterConflicts=0, hashConflicts=" + brokenParts.get());
@@ -146,13 +177,30 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
readRepair(brokenParts, txCacheName, fixesPerEntry);
+ if (S.includeSensitive()) {
+ for (LogListener listener : listeners) {
+ assertTrue(listener.check());
+
+ listener.reset();
+ }
+ }
+
if (fixesPerEntry != null && fixesPerEntry > 0)
assertEquals(PARTITIONS, brokenParts.get()); // Half fixed.
readRepair(brokenParts, atomicCacheName, fixesPerEntry != null ? 0 : null);
+ if (S.includeSensitive()) {
+ for (LogListener listener : listeners)
+ assertTrue(listener.check());
+ }
+
if (fixesPerEntry != null && fixesPerEntry > 0)
assertEquals(PARTITIONS, brokenParts.get()); // Atomics still broken.
+
+ assertEquals(S.includeSensitive(), lsnrUnmaskedKey.check());
+ assertEquals(S.includeSensitive(), !lsnrMaskedKey.check());
+ assertEquals(S.includeSensitive(), !lsnrMaskedVal.check());
}
/**
@@ -212,6 +260,8 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
ConsistencyCommand.PARTITION, String.valueOf(i),
ConsistencyCommand.STRATEGY, strategy.toString()));
+ assertTrue(VisorConsistencyStatusTask.MAP.isEmpty());
+
assertContains(log, testOut.toString(), "Cache not found");
}
}
@@ -225,6 +275,9 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster
ConsistencyCommand.CACHE, cacheName,
ConsistencyCommand.PARTITION, String.valueOf(i),
ConsistencyCommand.STRATEGY, strategy.toString()));
+
+ assertTrue(VisorConsistencyStatusTask.MAP.isEmpty());
+
assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND);
assertContains(log, testOut.toString(), "[found=1, fixed=" + (fixesPerEntry != null ? fixesPerEntry.toString() : ""));
diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
index d02bf6e..bf348c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/CacheConsistencyViolationEvent.java
@@ -68,7 +68,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
private static final long serialVersionUID = 0L;
/** Represents original values of entries.*/
- private final Map<Object, Map<ClusterNode, EntryInfo>> entries;
+ private final Map<Object, EntriesInfo> entries;
/** Fixed entries. */
private final Map<Object, Object> fixed;
@@ -92,7 +92,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
String cacheName,
ClusterNode node,
String msg,
- Map<Object, Map<ClusterNode, EntryInfo>> entries,
+ Map<Object, EntriesInfo> entries,
Map<Object, Object> fixed,
ReadRepairStrategy strategy) {
super(node, msg, EVT_CONSISTENCY_VIOLATION);
@@ -108,7 +108,7 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
*
* @return Collection of original entries.
*/
- public Map<Object, Map<ClusterNode, EntryInfo>> getEntries() {
+ public Map<Object, EntriesInfo> getEntries() {
return entries;
}
@@ -140,6 +140,21 @@ public class CacheConsistencyViolationEvent extends EventAdapter {
}
/**
+ * Inconsistent entries mapping.
+ */
+ public interface EntriesInfo {
+ /**
+ * @return Entry's mapping.
+ */
+ public Map<ClusterNode, EntryInfo> getMapping();
+
+ /**
+ * @return Entry's partition.
+ */
+ public int partition();
+ }
+
+ /**
* Inconsistent entry info.
*/
public interface EntryInfo {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index cbea2b4..3639a04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -2500,7 +2500,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
keepCacheObjects,
deserializeBinary,
false,
- getRes,
+ null,
getRes.version(),
0,
0,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
index c0cdb3d..fef2f04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
@@ -25,9 +25,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cache.ReadRepairStrategy;
import org.apache.ignite.cluster.ClusterNode;
@@ -45,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -335,7 +338,12 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
if (!evtMgr.isRecordable(EVT_CONSISTENCY_VIOLATION))
return;
- Map<Object, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entries = new HashMap<>();
+ boolean includeSensitive = S.includeSensitive();
+
+ Map<KeyCacheObject, Object> sensitiveKeyMap = new HashMap<>();
+ Map<ByteArrayWrapper, Object> sensitiveValMap = new HashMap<>();
+
+ Map<Object, CacheConsistencyViolationEvent.EntriesInfo> entries = new HashMap<>();
for (Map.Entry<ClusterNode, GridPartitionedGetFuture<KeyCacheObject, EntryGetResult>> pair : futs.entrySet()) {
ClusterNode node = pair.getKey();
@@ -344,21 +352,24 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
for (KeyCacheObject key : fut.keys()) {
if (inconsistentKeys.contains(key)) {
- Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> map =
- entries.computeIfAbsent(
- ctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false, null), k -> new HashMap<>());
+ sensitiveKeyMap.computeIfAbsent(key, k -> includeSensitive
+ ? ctx.unwrapBinaryIfNeeded(k, !deserializeBinary, false, null)
+ : "[HIDDEN_KEY#" + UUID.randomUUID() + "]");
+
+ CacheConsistencyViolationEvent.EntriesInfo entriesInfo =
+ entries.computeIfAbsent(sensitiveKeyMap.get(key), k -> new EventEntriesInfo(key.partition()));
EntryGetResult res = fut.result().get(key);
CacheEntryVersion ver = res != null ? res.version() : null;
- Object val = res != null ? ctx.unwrapBinaryIfNeeded(res.value(), !deserializeBinary, false, null) : null;
+ Object val = sensitiveValue(includeSensitive, res, sensitiveValMap);
boolean primary = primaries.get(key).equals(fut.affNode());
boolean correct = fixedEntries != null &&
((fixedEntries.get(key) != null && fixedEntries.get(key).equals(res)) ||
(fixedEntries.get(key) == null && res == null));
- map.put(node, new EventEntryInfo(val, ver, primary, correct));
+ entriesInfo.getMapping().put(node, new EventEntryInfo(val, ver, primary, correct));
}
}
}
@@ -371,9 +382,8 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
fixed = new HashMap<>();
for (Map.Entry<KeyCacheObject, EntryGetResult> entry : fixedEntries.entrySet()) {
- Object key = ctx.unwrapBinaryIfNeeded(entry.getKey(), !deserializeBinary, false, null);
- Object val = entry.getValue() != null ?
- ctx.unwrapBinaryIfNeeded(entry.getValue().value(), !deserializeBinary, false, null) : null;
+ Object key = sensitiveKeyMap.get(entry.getKey());
+ Object val = sensitiveValue(includeSensitive, entry.getValue(), sensitiveValMap);
fixed.put(key, val);
}
@@ -391,6 +401,58 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
/**
*
*/
+ private Object sensitiveValue(boolean includeSensitive, EntryGetResult res,
+ Map<ByteArrayWrapper, Object> sensitiveValMap) {
+ if (res != null) {
+ CacheObject val = res.value();
+
+ try {
+ ByteArrayWrapper wrapped = new ByteArrayWrapper(val.valueBytes(ctx.cacheObjectContext()));
+
+ return sensitiveValMap.computeIfAbsent(wrapped, w ->
+ includeSensitive ?
+ ctx.unwrapBinaryIfNeeded(val, !deserializeBinary, false, null) :
+ "[HIDDEN_VALUE#" + UUID.randomUUID() + "]");
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to unmarshall object.", e);
+ }
+ }
+ else
+ return null;
+ }
+
+ /**
+ *
+ */
+ private static final class EventEntriesInfo implements CacheConsistencyViolationEvent.EntriesInfo {
+ /** Mapping. */
+ final Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping = new HashMap<>();
+
+ /** Partition. */
+ final int partition;
+
+ /**
+ * @param partition Partition.
+ */
+ public EventEntriesInfo(int partition) {
+ this.partition = partition;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> getMapping() {
+ return mapping;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partition;
+ }
+ }
+
+ /**
+ *
+ */
private static final class EventEntryInfo implements CacheConsistencyViolationEvent.EntryInfo {
/** Value. */
final Object val;
@@ -440,4 +502,27 @@ public abstract class GridNearReadRepairAbstractFuture extends GridFutureAdapter
return correct;
}
}
+
+ /**
+ *
+ */
+ protected static final class ByteArrayWrapper {
+ /** Array. */
+ final byte[] arr;
+
+ /** */
+ public ByteArrayWrapper(byte[] arr) {
+ this.arr = arr;
+ }
+
+ /** */
+ @Override public boolean equals(Object o) {
+ return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
+ }
+
+ /** */
+ @Override public int hashCode() {
+ return Arrays.hashCode(arr);
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
index 2f6c15b..f87e4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
@@ -240,26 +240,6 @@ public class GridNearReadRepairFuture extends GridNearReadRepairAbstractFuture {
*/
public Map<KeyCacheObject, EntryGetResult> fixWithMajority(Collection<KeyCacheObject> inconsistentKeys)
throws IgniteCheckedException {
- /** */
- class ByteArrayWrapper {
- final byte[] arr;
-
- /** */
- public ByteArrayWrapper(byte[] arr) {
- this.arr = arr;
- }
-
- /** */
- @Override public boolean equals(Object o) {
- return Arrays.equals(arr, ((ByteArrayWrapper)o).arr);
- }
-
- /** */
- @Override public int hashCode() {
- return Arrays.hashCode(arr);
- }
- }
-
Set<KeyCacheObject> irreparableSet = new HashSet<>(inconsistentKeys.size());
Map<KeyCacheObject, EntryGetResult> fixedMap = new HashMap<>(inconsistentKeys.size());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
index 0948c30..7914da1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
@@ -117,7 +117,17 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
log.info("Consistency check started " +
"[grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + ", strategy=" + strategy + "]");
- VisorConsistencyStatusTask.MAP.put(arg, "0/" + part.fullSize());
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("[node=").append(ignite.localNode());
+ sb.append(", cacheGroup=").append(grpCtx.cacheOrGroupName());
+ sb.append(", part=").append(p).append("]");
+
+ String statusKey = sb.toString();
+
+ if (VisorConsistencyStatusTask.MAP.putIfAbsent(statusKey, "0/" + part.fullSize()) != null)
+ throw new IllegalStateException("Consistency check already started " +
+ "[grp=" + grpCtx.cacheOrGroupName() + ", part=" + p + "]");
long cnt = 0;
long statusTs = 0;
@@ -125,7 +135,7 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
part.reserve();
try {
- IgnitePredicate<CacheConsistencyViolationEvent> lsnr = new CacheConsistencyViolationEventListener();
+ IgnitePredicate<CacheConsistencyViolationEvent> lsnr = new CacheConsistencyViolationEventListener(cacheName);
ignite.events().localListen(lsnr, EVT_CONSISTENCY_VIOLATION);
@@ -169,7 +179,7 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
log.info("Consistency check progress [grp=" + grpCtx.cacheOrGroupName() +
", part=" + p + ", checked=" + cnt + "/" + part.fullSize() + "]");
- VisorConsistencyStatusTask.MAP.put(arg, cnt + "/" + part.fullSize());
+ VisorConsistencyStatusTask.MAP.put(statusKey, cnt + "/" + part.fullSize());
}
}
@@ -185,11 +195,11 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
finally {
part.release();
- VisorConsistencyStatusTask.MAP.remove(arg);
+ VisorConsistencyStatusTask.MAP.remove(statusKey);
}
if (!evts.isEmpty())
- return processEvents(cctx, p, cnt);
+ return processEvents(p, cnt);
else
return NOTHING_FOUND + " [processed=" + cnt + "]\n";
}
@@ -197,41 +207,43 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
/**
*
*/
- private String processEvents(GridCacheContext<Object, Object> cctx, int part, long cnt) {
+ private String processEvents(int part, long cnt) {
int found = 0;
int fixed = 0;
StringBuilder sb = new StringBuilder();
for (CacheConsistencyViolationEvent evt : evts) {
- for (Map.Entry<?, Map<ClusterNode, CacheConsistencyViolationEvent.EntryInfo>> entry : evt.getEntries().entrySet()) {
+ for (Map.Entry<?, CacheConsistencyViolationEvent.EntriesInfo> entry : evt.getEntries().entrySet()) {
Object key = entry.getKey();
- if (cctx.affinity().partition(key) != part)
+ if (entry.getValue().partition() != part)
continue; // Skipping other partitions results, which are generated by concurrent executions.
found++;
sb.append("Key: ").append(key)
.append(" (cache: ").append(evt.getCacheName())
- .append(", strategy: ").append(evt.getStrategy()).append(")").append("\n");
+ .append(", partition: ").append(entry.getValue().partition())
+ .append(", strategy: ").append(evt.getStrategy())
+ .append(")").append("\n");
if (evt.getFixedEntries().containsKey(key))
sb.append(" Fixed: ").append(evt.getFixedEntries().get(key)).append("\n");
- for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping : entry.getValue().entrySet()) {
+ for (Map.Entry<ClusterNode, CacheConsistencyViolationEvent.EntryInfo> mapping :
+ entry.getValue().getMapping().entrySet()) {
ClusterNode node = mapping.getKey();
CacheConsistencyViolationEvent.EntryInfo info = mapping.getValue();
sb.append(" Node: ").append(node).append("\n")
.append(" Value: ").append(info.getValue()).append("\n")
- .append(" Version: ").append(info.getVersion()).append("\n");
+ .append(" Version: ").append(info.getVersion()).append("\n")
+ .append(" On primary: ").append(info.isPrimary()).append("\n");
if (info.getVersion() != null)
sb.append(" Other cluster version: ").append(info.getVersion().otherClusterVersion()).append("\n");
- sb.append(" On primary: ").append(info.isPrimary()).append("\n");
-
if (info.isCorrect())
sb.append(" Considered as a CORRECT value!").append("\n");
}
@@ -259,13 +271,26 @@ public class VisorConsistencyRepairTask extends AbstractConsistencyTask<VisorCon
/** Serial version uid. */
private static final long serialVersionUID = 0L;
+ /** Cache name. */
+ private final String cacheName;
+
+ /**
+ * @param name Name.
+ */
+ private CacheConsistencyViolationEventListener(String name) {
+ cacheName = name;
+ }
+
/**
* {@inheritDoc}
*/
- @Override public boolean apply(CacheConsistencyViolationEvent e) {
- assert e instanceof CacheConsistencyViolationEvent;
+ @Override public boolean apply(CacheConsistencyViolationEvent evt) {
+ assert evt instanceof CacheConsistencyViolationEvent;
+
+ if (!evt.getCacheName().equals(cacheName))
+ return true; // Skipping other caches results, which are generated by concurrent executions.
- evts.add(e);
+ evts.add(evt);
return true;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java
index 50c8b3f..6008d3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskArg.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.visor.consistency;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.util.Objects;
import org.apache.ignite.cache.ReadRepairStrategy;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -92,22 +91,4 @@ public class VisorConsistencyRepairTaskArg extends IgniteDataTransferObject {
public ReadRepairStrategy strategy() {
return strategy;
}
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- VisorConsistencyRepairTaskArg arg = (VisorConsistencyRepairTaskArg)o;
-
- return part == arg.part && Objects.equals(cacheName, arg.cacheName);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(cacheName, part);
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java
index af001e6..60f43d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyStatusTask.java
@@ -37,7 +37,7 @@ public class VisorConsistencyStatusTask extends AbstractConsistencyTask<Void, St
public static final String NOTHING_FOUND = "Consistency check/repair operations were NOT found.";
/** Status map. */
- public static final ConcurrentHashMap<VisorConsistencyRepairTaskArg, String> MAP = new ConcurrentHashMap<>();
+ public static final ConcurrentHashMap<String, String> MAP = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override protected VisorJob<Void, String> job(Void arg) {
@@ -80,13 +80,9 @@ public class VisorConsistencyStatusTask extends AbstractConsistencyTask<Void, St
StringBuilder sb = new StringBuilder();
- for (Map.Entry<VisorConsistencyRepairTaskArg, String> entry : MAP.entrySet()) {
- VisorConsistencyRepairTaskArg args = entry.getKey();
- String status = entry.getValue();
-
- sb.append("\n Cache: ").append(args.cacheName()).append("\n")
- .append(" Partition: ").append(args.part()).append("\n")
- .append(" Status: ").append(status).append("\n");
+ for (Map.Entry<String, String> entry : MAP.entrySet()) {
+ sb.append("\n Job: ").append(entry.getKey()).append("\n")
+ .append(" Status: ").append(entry.getValue()).append("\n");
}
return sb.toString();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
index 72358b4..4e1dcec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/consistency/AbstractReadRepairTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
@@ -77,7 +78,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
private static final ConcurrentLinkedDeque<CacheConsistencyViolationEvent> evtDeq = new ConcurrentLinkedDeque<>();
/** Key. */
- protected static int iterableKey;
+ private static final AtomicInteger iterableKey = new AtomicInteger();
/** Backups count. */
protected Integer backupsCount() {
@@ -140,7 +141,7 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
- log.info("Checked " + iterableKey + " keys");
+ log.info("Checked " + iterableKey.get() + " keys");
stopAllGrids();
@@ -204,7 +205,9 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
assertEquals(atomicityMode() == TRANSACTIONAL ? data.strategy : ReadRepairStrategy.CHECK_ONLY, evt.getStrategy());
// Optimistic and read committed transactions produce per key fixes.
- evtEntries.putAll(evt.getEntries());
+ for (Map.Entry<Object, CacheConsistencyViolationEvent.EntriesInfo> entries : evt.getEntries().entrySet())
+ evtEntries.put(entries.getKey(), entries.getValue().getMapping());
+
evtFixed.putAll(evt.getFixedEntries());
}
}
@@ -277,9 +280,11 @@ public abstract class AbstractReadRepairTest extends GridCommonAbstractTest {
try {
for (int j = 0; j < cnt; j++) {
- InconsistentMapping res = setDifferentValuesForSameKey(++iterableKey, misses, nulls, strategy);
+ int curKey = iterableKey.incrementAndGet();
+
+ InconsistentMapping res = setDifferentValuesForSameKey(curKey, misses, nulls, strategy);
- results.put(iterableKey, res);
+ results.put(curKey, res);
}
for (Ignite node : G.allGrids()) { // Check that cache filled properly.