You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2022/06/13 16:46:08 UTC
[ignite] branch master updated: IGNITE-17114 Idle_verify must print and compare full partition counter state instead of just LWM (#10071)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 616e1eddfae IGNITE-17114 Idle_verify must print and compare full partition counter state instead of just LWM (#10071)
616e1eddfae is described below
commit 616e1eddfaea4fa0ed5e9b203954c4e810079179
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon Jun 13 19:45:58 2022 +0300
IGNITE-17114 Idle_verify must print and compare full partition counter state instead of just LWM (#10071)
---
.../apache/ignite/util/GridCommandHandlerTest.java | 162 +++++++++++++++++++++
.../processors/cache/PartitionUpdateCounter.java | 5 +
.../cache/PartitionUpdateCounterDebugWrapper.java | 5 +
.../cache/PartitionUpdateCounterErrorWrapper.java | 5 +
.../cache/PartitionUpdateCounterTrackingImpl.java | 55 ++++++-
.../cache/PartitionUpdateCounterVolatileImpl.java | 5 +
.../cache/verify/IdleVerifyResultV2.java | 4 +-
.../processors/cache/verify/IdleVerifyUtility.java | 2 +-
.../cache/verify/PartitionHashRecordV2.java | 17 ++-
.../cache/verify/VerifyBackupPartitionsTaskV2.java | 2 +-
10 files changed, 247 insertions(+), 15 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 1669e2251e6..296cee8783a 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -44,8 +44,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
@@ -62,6 +64,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.ShutdownPolicy;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.BaselineNode;
import org.apache.ignite.cluster.ClusterNode;
@@ -86,6 +89,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -136,9 +140,11 @@ import org.junit.Test;
import static java.io.File.separatorChar;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CLUSTER_NAME;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
@@ -2310,6 +2316,162 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
fail("Should be found dump with conflicts");
}
+ /**
+ * Tests that idle verify checks gaps.
+ */
+ @Test
+ public void testCacheIdleVerifyChecksGapsAtomic() throws Exception {
+ testCacheIdleVerifyChecksGaps(ATOMIC);
+ }
+
+ /**
+ * Tests that idle verify checks gaps.
+ */
+ @Test
+ public void testCacheIdleVerifyChecksGapsTx() throws Exception {
+ testCacheIdleVerifyChecksGaps(TRANSACTIONAL);
+ }
+
+ /**
+ * Tests that idle verify checks gaps.
+ */
+ private void testCacheIdleVerifyChecksGaps(CacheAtomicityMode atomicityMode) throws Exception {
+ int parts = 1;
+
+ IgniteEx ignite = startGrids(3);
+
+ ignite.cluster().active(true);
+
+ int backups = 2;
+
+ IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>()
+ .setAffinity(new RendezvousAffinityFunction(false, parts))
+ .setBackups(backups)
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(atomicityMode)
+ .setWriteSynchronizationMode(PRIMARY_SYNC)
+ .setReadFromBackup(true));
+
+ int cnt = 0;
+
+ for (int i = 0; i < 100; i++) {
+ cache.put(i, i);
+
+ cnt++;
+ }
+
+ int cntFrom = cnt;
+
+ Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+ Ignite backup = backupNode(0L, DEFAULT_CACHE_NAME);
+
+ TestRecordingCommunicationSpi primSpi = TestRecordingCommunicationSpi.spi(prim);
+
+ AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
+
+ primSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtTxFinishRequest ||
+ msg instanceof GridDhtAtomicSingleUpdateRequest) {
+ CountDownLatch blockLatch = latchRef.get();
+
+ boolean block = blockLatch.getCount() > 0;
+
+ blockLatch.countDown();
+
+ return block; // Generating counter gaps.
+ }
+ else
+ return false;
+ }
+ });
+
+ int blockedKey = cntFrom + 1_000;
+ int committedKey = blockedKey + 1_000;
+
+ int blockedFrom = blockedKey;
+ int committedFrom = committedKey;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ List<String> gaps = new ArrayList<>();
+
+ Consumer<Integer> cachePut = (key) -> {
+ if (atomicityMode == TRANSACTIONAL)
+ try (Transaction tx = prim.transactions().txStart()) {
+ prim.cache(DEFAULT_CACHE_NAME).put(key, key);
+
+ tx.commit();
+ }
+ else
+ prim.cache(DEFAULT_CACHE_NAME).put(key, key);
+ };
+
+ for (int it = 0; it < 10; it++) {
+ int range = rnd.nextInt(3);
+
+ CountDownLatch blockLatch = new CountDownLatch(backups * range);
+
+ latchRef.set(blockLatch);
+
+ for (int i = 0; i < range; i++) {
+ cachePut.accept(blockedKey++);
+
+ cnt++;
+ }
+
+ if (range == 1)
+ gaps.add(String.valueOf(cnt));
+ else if (range > 1)
+ gaps.add((cnt - range + 1) + " - " + cnt);
+
+ blockLatch.await();
+
+ for (int i = 0; i < range; i++) {
+ cachePut.accept(committedKey++);
+
+ cnt++;
+ }
+ }
+
+ for (int key = blockedFrom; key < blockedKey; key++) {
+ assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
+ assertNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key)); // Commit is blocked.
+ }
+
+ for (int key = committedFrom; key < committedKey; key++) {
+ assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
+ assertNotNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
+ }
+
+ G.restart(true);
+
+ ignite.cluster().active(true);
+
+ awaitPartitionMapExchange();
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+ if (atomicityMode == TRANSACTIONAL) {
+ assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=1, " +
+ "hashConflicts=1]");
+
+ assertContains(log, testOut.toString(),
+ "updateCntr=[lwm=" + cnt + ", missed=[], hwm=" + cnt + "]"); // Primary
+
+ assertContains(log, testOut.toString(),
+ "updateCntr=[lwm=" + cntFrom + ", missed=" + gaps + ", hwm=" + cnt + "]"); // Backups.
+ }
+ else {
+ assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=0, " +
+ "hashConflicts=1]");
+
+ assertContains(log, testOut.toString(), "updateCntr=" + cnt); // All
+ }
+ }
+
/**
* @return Build matcher for dump file name.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index 7b8f1c0b235..f37bf4c26dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -148,4 +148,9 @@ public interface PartitionUpdateCounter extends Iterable<long[]> {
* @return A deep copy of current instance.
*/
public PartitionUpdateCounter copy();
+
+ /**
+ * @return Comparable counter state.
+ */
+ public Object comparableState();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
index 2c57c77a32b..87cc5803fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
@@ -272,6 +272,11 @@ public class PartitionUpdateCounterDebugWrapper implements PartitionUpdateCounte
return new PartitionUpdateCounterDebugWrapper(partId, delegate.copy());
}
+ /** {@inheritDoc} */
+ @Override public Object comparableState() {
+ return delegate.comparableState();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return delegate.toString();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
index 3a7224dda9f..e08317c0315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
@@ -176,6 +176,11 @@ public class PartitionUpdateCounterErrorWrapper implements PartitionUpdateCounte
return new PartitionUpdateCounterErrorWrapper(partId, delegate.copy());
}
+ /** {@inheritDoc} */
+ @Override public Object comparableState() {
+ return delegate.comparableState();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return delegate.toString();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index f4e7a06f5f9..57da22e13f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -22,7 +22,9 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -445,15 +447,60 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
return F.iterator(queue.values().iterator(), item -> new long[] {item.start, item.delta}, true);
}
+ /**
+ * Human-readable missed unordered updates.
+ */
+ private String gaps() {
+ List<String> gaps = new ArrayList<>();
+
+ long prev = cntr.get();
+
+ for (Item item : queue.values()) {
+ if (prev + 1 == item.start)
+ gaps.add(String.valueOf(item.start));
+ else
+ gaps.add((prev + 1) + " - " + item.start);
+
+ prev = item.start + item.delta;
+ }
+
+ return gaps.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object comparableState() {
+ String missed;
+ long lwm;
+ long hwm;
+
+ synchronized (this) {
+ missed = gaps();
+
+ lwm = get();
+
+ hwm = highestAppliedCounter();
+ }
+
+ return new SB()
+ .a("[lwm=")
+ .a(lwm)
+ .a(", missed=")
+ .a(missed)
+ .a(", hwm=")
+ .a(hwm)
+ .a(']')
+ .toString();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- String quequeStr;
+ String missed;
long lwm;
long hwm;
long maxApplied;
synchronized (this) {
- quequeStr = queue.toString();
+ missed = gaps();
lwm = get();
@@ -465,8 +512,8 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
return new SB()
.a("Counter [lwm=")
.a(lwm)
- .a(", holes=")
- .a(quequeStr)
+ .a(", missed=")
+ .a(missed)
.a(", maxApplied=")
.a(maxApplied)
.a(", hwm=")
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
index ae32a511e58..054ecb6df8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
@@ -177,4 +177,9 @@ public class PartitionUpdateCounterVolatileImpl implements PartitionUpdateCounte
return copy;
}
+
+ /** {@inheritDoc} */
+ @Override public Object comparableState() {
+ return get();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
index b8e2bb5045c..220e92ae9d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
@@ -83,7 +83,7 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
) {
for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> e : clusterHashes.entrySet()) {
Integer partHash = null;
- Long updateCntr = null;
+ Object updateCntr = null;
for (PartitionHashRecordV2 record : e.getValue()) {
if (record.partitionState() == PartitionHashRecordV2.PartitionState.MOVING) {
@@ -106,7 +106,7 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
updateCntr = record.updateCounter();
}
else {
- if (record.updateCounter() != updateCntr)
+ if (!record.updateCounter().equals(updateCntr))
cntrConflicts.putIfAbsent(e.getKey(), e.getValue());
if (record.partitionHash() != partHash)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index 026ec973a1d..5e8e08cd5db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -240,7 +240,7 @@ public class IdleVerifyUtility {
*/
public static @Nullable PartitionHashRecordV2 calculatePartitionHash(
PartitionKeyV2 partKey,
- long updCntr,
+ Object updCntr,
Object consId,
GridDhtPartitionState state,
boolean isPrimary,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
index 810f46b9372..2bcbb76bc1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
@@ -52,8 +52,9 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
@GridToStringExclude
private int partHash;
- /** Update counter. */
- private long updateCntr;
+ /** Update counter's state. */
+ @GridToStringInclude
+ private Object updateCntr;
/** Size. */
@GridToStringExclude
@@ -76,7 +77,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
boolean isPrimary,
Object consistentId,
int partHash,
- long updateCntr,
+ Object updateCntr,
long size,
PartitionState partitionState
) {
@@ -87,6 +88,8 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
this.updateCntr = updateCntr;
this.size = size;
this.partitionState = partitionState;
+
+ assert updateCntr != null;
}
/**
@@ -126,7 +129,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
/**
* @return Update counter.
*/
- public long updateCounter() {
+ public Object updateCounter() {
return updateCntr;
}
@@ -150,7 +153,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
out.writeBoolean(isPrimary);
out.writeObject(consistentId);
out.writeInt(partHash);
- out.writeLong(updateCntr);
+ out.writeObject(updateCntr);
out.writeLong(size);
U.writeEnum(out, partitionState);
}
@@ -162,7 +165,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
isPrimary = in.readBoolean();
consistentId = in.readObject();
partHash = in.readInt();
- updateCntr = in.readLong();
+ updateCntr = in.readObject();
size = in.readLong();
if (protoVer >= V2)
@@ -193,7 +196,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
PartitionHashRecordV2 v2 = (PartitionHashRecordV2)o;
- return partHash == v2.partHash && updateCntr == v2.updateCntr && size == v2.size && partKey.equals(v2.partKey) &&
+ return partHash == v2.partHash && updateCntr.equals(v2.updateCntr) && size == v2.size && partKey.equals(v2.partKey) &&
consistentId.equals(v2.consistentId) && partitionState == v2.partitionState;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
index 8765d301581..0376d81e02c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
@@ -478,7 +478,7 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
PartitionKeyV2 key = new PartitionKeyV2(gctx.groupId(), part.id(), gctx.cacheOrGroupName());
PartitionHashRecordV2 hash = calculatePartitionHash(key,
- updateCntrBefore == null ? 0 : updateCntrBefore.get(),
+ updateCntrBefore == null ? 0 : updateCntrBefore.comparableState(),
ignite.context().discovery().localNode().consistentId(),
part.state(),
part.primary(gctx.topology().readyTopologyVersion()),