You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2022/06/13 16:46:08 UTC

[ignite] branch master updated: IGNITE-17114 Idle_verify must print and compare full partition counter state instead of just LWM (#10071)

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 616e1eddfae IGNITE-17114 Idle_verify must print and compare full partition counter state instead of just LWM (#10071)
616e1eddfae is described below

commit 616e1eddfaea4fa0ed5e9b203954c4e810079179
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon Jun 13 19:45:58 2022 +0300

    IGNITE-17114 Idle_verify must print and compare full partition counter state instead of just LWM (#10071)
---
 .../apache/ignite/util/GridCommandHandlerTest.java | 162 +++++++++++++++++++++
 .../processors/cache/PartitionUpdateCounter.java   |   5 +
 .../cache/PartitionUpdateCounterDebugWrapper.java  |   5 +
 .../cache/PartitionUpdateCounterErrorWrapper.java  |   5 +
 .../cache/PartitionUpdateCounterTrackingImpl.java  |  55 ++++++-
 .../cache/PartitionUpdateCounterVolatileImpl.java  |   5 +
 .../cache/verify/IdleVerifyResultV2.java           |   4 +-
 .../processors/cache/verify/IdleVerifyUtility.java |   2 +-
 .../cache/verify/PartitionHashRecordV2.java        |  17 ++-
 .../cache/verify/VerifyBackupPartitionsTaskV2.java |   2 +-
 10 files changed, 247 insertions(+), 15 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 1669e2251e6..296cee8783a 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -44,8 +44,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
@@ -62,6 +64,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.ShutdownPolicy;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -86,6 +89,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -136,9 +140,11 @@ import org.junit.Test;
 
 import static java.io.File.separatorChar;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CLUSTER_NAME;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
@@ -2310,6 +2316,162 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
             fail("Should be found dump with conflicts");
     }
 
+    /**
+     * Tests that idle verify checks gaps.
+     */
+    @Test
+    public void testCacheIdleVerifyChecksGapsAtomic() throws Exception {
+        testCacheIdleVerifyChecksGaps(ATOMIC);
+    }
+
+    /**
+     * Tests that idle verify checks gaps.
+     */
+    @Test
+    public void testCacheIdleVerifyChecksGapsTx() throws Exception {
+        testCacheIdleVerifyChecksGaps(TRANSACTIONAL);
+    }
+
+    /**
+     * Tests that idle verify checks gaps.
+     */
+    private void testCacheIdleVerifyChecksGaps(CacheAtomicityMode atomicityMode) throws Exception {
+        int parts = 1;
+
+        IgniteEx ignite = startGrids(3);
+
+        ignite.cluster().active(true);
+
+        int backups = 2;
+
+        IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>()
+            .setAffinity(new RendezvousAffinityFunction(false, parts))
+            .setBackups(backups)
+            .setName(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(atomicityMode)
+            .setWriteSynchronizationMode(PRIMARY_SYNC)
+            .setReadFromBackup(true));
+
+        int cnt = 0;
+
+        for (int i = 0; i < 100; i++) {
+            cache.put(i, i);
+
+            cnt++;
+        }
+
+        int cntFrom = cnt;
+
+        Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+        Ignite backup = backupNode(0L, DEFAULT_CACHE_NAME);
+
+        TestRecordingCommunicationSpi primSpi = TestRecordingCommunicationSpi.spi(prim);
+
+        AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
+
+        primSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (msg instanceof GridDhtTxFinishRequest ||
+                    msg instanceof GridDhtAtomicSingleUpdateRequest) {
+                    CountDownLatch blockLatch = latchRef.get();
+
+                    boolean block = blockLatch.getCount() > 0;
+
+                    blockLatch.countDown();
+
+                    return block; // Generating counter gaps.
+                }
+                else
+                    return false;
+            }
+        });
+
+        int blockedKey = cntFrom + 1_000;
+        int committedKey = blockedKey + 1_000;
+
+        int blockedFrom = blockedKey;
+        int committedFrom = committedKey;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        List<String> gaps = new ArrayList<>();
+
+        Consumer<Integer> cachePut = (key) -> {
+            if (atomicityMode == TRANSACTIONAL)
+                try (Transaction tx = prim.transactions().txStart()) {
+                    prim.cache(DEFAULT_CACHE_NAME).put(key, key);
+
+                    tx.commit();
+                }
+            else
+                prim.cache(DEFAULT_CACHE_NAME).put(key, key);
+        };
+
+        for (int it = 0; it < 10; it++) {
+            int range = rnd.nextInt(3);
+
+            CountDownLatch blockLatch = new CountDownLatch(backups * range);
+
+            latchRef.set(blockLatch);
+
+            for (int i = 0; i < range; i++) {
+                cachePut.accept(blockedKey++);
+
+                cnt++;
+            }
+
+            if (range == 1)
+                gaps.add(String.valueOf(cnt));
+            else if (range > 1)
+                gaps.add((cnt - range + 1) + " - " + cnt);
+
+            blockLatch.await();
+
+            for (int i = 0; i < range; i++) {
+                cachePut.accept(committedKey++);
+
+                cnt++;
+            }
+        }
+
+        for (int key = blockedFrom; key < blockedKey; key++) {
+            assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
+            assertNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key)); // Commit is blocked.
+        }
+
+        for (int key = committedFrom; key < committedKey; key++) {
+            assertNotNull(prim.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
+            assertNotNull(backup.getOrCreateCache(DEFAULT_CACHE_NAME).get(key));
+        }
+
+        G.restart(true);
+
+        ignite.cluster().active(true);
+
+        awaitPartitionMapExchange();
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+        if (atomicityMode == TRANSACTIONAL) {
+            assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=1, " +
+                "hashConflicts=1]");
+
+            assertContains(log, testOut.toString(),
+                "updateCntr=[lwm=" + cnt + ", missed=[], hwm=" + cnt + "]"); // Primary
+
+            assertContains(log, testOut.toString(),
+                "updateCntr=[lwm=" + cntFrom + ", missed=" + gaps + ", hwm=" + cnt + "]"); // Backups.
+        }
+        else {
+            assertContains(log, testOut.toString(), "conflict partitions has been found: [counterConflicts=0, " +
+                "hashConflicts=1]");
+
+            assertContains(log, testOut.toString(), "updateCntr=" + cnt); // All
+        }
+    }
+
     /**
      * @return Build matcher for dump file name.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index 7b8f1c0b235..f37bf4c26dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -148,4 +148,9 @@ public interface PartitionUpdateCounter extends Iterable<long[]> {
      * @return A deep copy of current instance.
      */
     public PartitionUpdateCounter copy();
+
+    /**
+     * @return Comparable counter state.
+     */
+    public Object comparableState();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
index 2c57c77a32b..87cc5803fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterDebugWrapper.java
@@ -272,6 +272,11 @@ public class PartitionUpdateCounterDebugWrapper implements PartitionUpdateCounte
         return new PartitionUpdateCounterDebugWrapper(partId, delegate.copy());
     }
 
+    /** {@inheritDoc} */
+    @Override public Object comparableState() {
+        return delegate.comparableState();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return delegate.toString();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
index 3a7224dda9f..e08317c0315 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterErrorWrapper.java
@@ -176,6 +176,11 @@ public class PartitionUpdateCounterErrorWrapper implements PartitionUpdateCounte
         return new PartitionUpdateCounterErrorWrapper(partId, delegate.copy());
     }
 
+    /** {@inheritDoc} */
+    @Override public Object comparableState() {
+        return delegate.comparableState();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return delegate.toString();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
index f4e7a06f5f9..57da22e13f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterTrackingImpl.java
@@ -22,7 +22,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -445,15 +447,60 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
         return F.iterator(queue.values().iterator(), item -> new long[] {item.start, item.delta}, true);
     }
 
+    /**
+     * Human-readable missed unordered updates.
+     */
+    private String gaps() {
+        List<String> gaps = new ArrayList<>();
+
+        long prev = cntr.get();
+
+        for (Item item : queue.values()) {
+            if (prev + 1 == item.start)
+                gaps.add(String.valueOf(item.start));
+            else
+                gaps.add((prev + 1) + " - " + item.start);
+
+            prev = item.start + item.delta;
+        }
+
+        return gaps.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object comparableState() {
+        String missed;
+        long lwm;
+        long hwm;
+
+        synchronized (this) {
+            missed = gaps();
+
+            lwm = get();
+
+            hwm = highestAppliedCounter();
+        }
+
+        return new SB()
+            .a("[lwm=")
+            .a(lwm)
+            .a(", missed=")
+            .a(missed)
+            .a(", hwm=")
+            .a(hwm)
+            .a(']')
+            .toString();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
-        String quequeStr;
+        String missed;
         long lwm;
         long hwm;
         long maxApplied;
 
         synchronized (this) {
-            quequeStr = queue.toString();
+            missed = gaps();
 
             lwm = get();
 
@@ -465,8 +512,8 @@ public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounte
         return new SB()
             .a("Counter [lwm=")
             .a(lwm)
-            .a(", holes=")
-            .a(quequeStr)
+            .a(", missed=")
+            .a(missed)
             .a(", maxApplied=")
             .a(maxApplied)
             .a(", hwm=")
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
index ae32a511e58..054ecb6df8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounterVolatileImpl.java
@@ -177,4 +177,9 @@ public class PartitionUpdateCounterVolatileImpl implements PartitionUpdateCounte
 
         return copy;
     }
+
+    /** {@inheritDoc} */
+    @Override public Object comparableState() {
+        return get();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
index b8e2bb5045c..220e92ae9d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
@@ -83,7 +83,7 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
     ) {
         for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> e : clusterHashes.entrySet()) {
             Integer partHash = null;
-            Long updateCntr = null;
+            Object updateCntr = null;
 
             for (PartitionHashRecordV2 record : e.getValue()) {
                 if (record.partitionState() == PartitionHashRecordV2.PartitionState.MOVING) {
@@ -106,7 +106,7 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
                     updateCntr = record.updateCounter();
                 }
                 else {
-                    if (record.updateCounter() != updateCntr)
+                    if (!record.updateCounter().equals(updateCntr))
                         cntrConflicts.putIfAbsent(e.getKey(), e.getValue());
 
                     if (record.partitionHash() != partHash)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index 026ec973a1d..5e8e08cd5db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -240,7 +240,7 @@ public class IdleVerifyUtility {
      */
     public static @Nullable PartitionHashRecordV2 calculatePartitionHash(
         PartitionKeyV2 partKey,
-        long updCntr,
+        Object updCntr,
         Object consId,
         GridDhtPartitionState state,
         boolean isPrimary,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
index 810f46b9372..2bcbb76bc1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
@@ -52,8 +52,9 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
     @GridToStringExclude
     private int partHash;
 
-    /** Update counter. */
-    private long updateCntr;
+    /** Update counter's state. */
+    @GridToStringInclude
+    private Object updateCntr;
 
     /** Size. */
     @GridToStringExclude
@@ -76,7 +77,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
         boolean isPrimary,
         Object consistentId,
         int partHash,
-        long updateCntr,
+        Object updateCntr,
         long size,
         PartitionState partitionState
     ) {
@@ -87,6 +88,8 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
         this.updateCntr = updateCntr;
         this.size = size;
         this.partitionState = partitionState;
+
+        assert updateCntr != null;
     }
 
     /**
@@ -126,7 +129,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
     /**
      * @return Update counter.
      */
-    public long updateCounter() {
+    public Object updateCounter() {
         return updateCntr;
     }
 
@@ -150,7 +153,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
         out.writeBoolean(isPrimary);
         out.writeObject(consistentId);
         out.writeInt(partHash);
-        out.writeLong(updateCntr);
+        out.writeObject(updateCntr);
         out.writeLong(size);
         U.writeEnum(out, partitionState);
     }
@@ -162,7 +165,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
         isPrimary = in.readBoolean();
         consistentId = in.readObject();
         partHash = in.readInt();
-        updateCntr = in.readLong();
+        updateCntr = in.readObject();
         size = in.readLong();
 
         if (protoVer >= V2)
@@ -193,7 +196,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
 
         PartitionHashRecordV2 v2 = (PartitionHashRecordV2)o;
 
-        return partHash == v2.partHash && updateCntr == v2.updateCntr && size == v2.size && partKey.equals(v2.partKey) &&
+        return partHash == v2.partHash && updateCntr.equals(v2.updateCntr) && size == v2.size && partKey.equals(v2.partKey) &&
             consistentId.equals(v2.consistentId) && partitionState == v2.partitionState;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
index 8765d301581..0376d81e02c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
@@ -478,7 +478,7 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
                     PartitionKeyV2 key = new PartitionKeyV2(gctx.groupId(), part.id(), gctx.cacheOrGroupName());
 
                     PartitionHashRecordV2 hash = calculatePartitionHash(key,
-                        updateCntrBefore == null ? 0 : updateCntrBefore.get(),
+                        updateCntrBefore == null ? 0 : updateCntrBefore.comparableState(),
                         ignite.context().discovery().localNode().consistentId(),
                         part.state(),
                         part.primary(gctx.topology().readyTopologyVersion()),