You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2023/09/26 11:19:02 UTC

[ignite] branch master updated: IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes #10947.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 158bd2c0d65 IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes #10947.
158bd2c0d65 is described below

commit 158bd2c0d6546d01e55e541dc1b9a3156713d183
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Sep 26 14:16:39 2023 +0300

    IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes #10947.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../apache/ignite/util/GridCommandHandlerTest.java | 36 ++++++++++++++++++
 .../SnapshotPartitionsQuickVerifyHandler.java      |  3 +-
 .../snapshot/SnapshotPartitionsVerifyHandler.java  | 43 +++++++++++++++++++++-
 .../processors/cache/verify/IdleVerifyUtility.java |  3 ++
 .../cache/verify/PartitionHashRecordV2.java        | 16 ++++++++
 .../snapshot/IgniteClusterSnapshotCheckTest.java   | 30 +++++++++++++++
 6 files changed, 129 insertions(+), 2 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index abb7a50c4fb..6a40bfb738e 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -52,6 +53,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
@@ -2393,6 +2397,38 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
         assertContains(log, testOut.toString(), "MOVING partitions");
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCacheIdleVerifyExpiringEntries() throws Exception {
+        IgniteEx ignite = startGrids(3);
+
+        ignite.cluster().state(ACTIVE);
+
+        IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            .setBackups(1));
+
+        Random rnd = new Random();
+
+        // Put without expiry policy.
+        for (int i = 0; i < 5_000; i++)
+            cache.put(i, i);
+
+        // Put with expiry policy.
+        for (int i = 5_000; i < 10_000; i++) {
+            ExpiryPolicy expPol = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, rnd.nextInt(1_000)));
+            cache.withExpiryPolicy(expPol).put(i, i);
+        }
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+        assertContains(log, testOut.toString(), "no conflicts have been found");
+    }
+
     /** */
     @Test
     public void testCacheSequence() throws Exception {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
index 8bc5166e8d3..e378dec9b35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
@@ -89,7 +89,8 @@ public class SnapshotPartitionsQuickVerifyHandler extends SnapshotPartitionsVeri
             if (other == null)
                 return;
 
-            if (val.size() != other.size() || !Objects.equals(val.updateCounter(), other.updateCounter()))
+            if ((!val.hasExpiringEntries() && !other.hasExpiringEntries() && val.size() != other.size())
+                || !Objects.equals(val.updateCounter(), other.updateCounter()))
                 wrnGrps.add(part.groupId());
         }));
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 3547d5edf30..e24a51c6b08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -45,12 +45,15 @@ import org.apache.ignite.internal.management.cache.PartitionKeyV2;
 import org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider;
 import org.apache.ignite.internal.managers.encryption.GroupKey;
 import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
+import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
 import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
@@ -102,7 +105,7 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
     /** {@inheritDoc} */
     @Override public Map<PartitionKeyV2, PartitionHashRecordV2> invoke(SnapshotHandlerContext opCtx) throws IgniteCheckedException {
         if (!opCtx.snapshotDirectory().exists())
-            throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());;
+            throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());
 
         SnapshotMetadata meta = opCtx.metadata();
 
@@ -271,6 +274,10 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
 
                         assert hash != null : "OWNING must have hash: " + key;
 
+                        // We should skip size comparison if there are entries to expire exist.
+                        if (hasExpiringEntries(snpCtx, pageStore, pageBuff, io.getPendingTreeRoot(pageAddr)))
+                            hash.hasExpiringEntries(true);
+
                         res.put(key, hash);
                     }
                     catch (IOException e) {
@@ -294,6 +301,40 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
         return res;
     }
 
+    /** */
+    private boolean hasExpiringEntries(
+        GridKernalContext ctx,
+        PageStore pageStore,
+        ByteBuffer pageBuff,
+        long pendingTreeMetaId
+    ) throws IgniteCheckedException {
+        if (pendingTreeMetaId == 0)
+            return false;
+
+        long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+        pageBuff.clear();
+        pageStore.read(pendingTreeMetaId, pageBuff, true);
+
+        if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
+            ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());
+
+        BPlusMetaIO treeIO = BPlusMetaIO.VERSIONS.forPage(pageAddr);
+
+        int rootLvl = treeIO.getRootLevel(pageAddr);
+        long rootId = treeIO.getFirstPageId(pageAddr, rootLvl);
+
+        pageBuff.clear();
+        pageStore.read(rootId, pageBuff, true);
+
+        if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
+            ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());
+
+        BPlusIO<?> rootIO = PageIO.getPageIO(pageBuff);
+
+        return rootIO.getCount(pageAddr) != 0;
+    }
+
     /** {@inheritDoc} */
     @Override public void complete(String name,
         Collection<SnapshotHandlerResult<Map<PartitionKeyV2, PartitionHashRecordV2>>> results) throws IgniteCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index b93e1cf1394..79c1806d085 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -304,6 +304,9 @@ public class IdleVerifyUtility {
         while (it.hasNextX()) {
             CacheDataRow row = it.nextX();
 
+            if (row.expireTime() > 0)
+                continue;
+
             partHash += row.key().hashCode();
             partVerHash += row.version().hashCode(); // Detects ABA problem.
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
index df7c98452d1..852fe856d3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
@@ -95,6 +95,10 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
     @GridToStringExclude
     private int regKeys;
 
+    /** If partition has entries to expire. */
+    @GridToStringExclude
+    private boolean hasExpiringEntries;
+
     /**
      * @param partKey Partition key.
      * @param isPrimary Is primary.
@@ -219,6 +223,16 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
         return regKeys;
     }
 
+    /** */
+    public boolean hasExpiringEntries() {
+        return hasExpiringEntries;
+    }
+
+    /** */
+    public void hasExpiringEntries(boolean hasExpiringEntries) {
+        this.hasExpiringEntries = hasExpiringEntries;
+    }
+
     /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws IOException {
         out.writeObject(partKey);
@@ -233,6 +247,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
         out.writeInt(noCfKeys);
         out.writeInt(binKeys);
         out.writeInt(regKeys);
+        out.writeBoolean(hasExpiringEntries);
     }
 
     /** {@inheritDoc} */
@@ -255,6 +270,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
         noCfKeys = in.readInt();
         binKeys = in.readInt();
         regKeys = in.readInt();
+        hasExpiringEntries = in.readBoolean();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index f9286a2510d..c3e811d8e71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -36,7 +36,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -600,6 +604,32 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
         assertTrue("Threads created: " + createdThreads, createdThreads < iterations);
     }
 
+    /** */
+    @Test
+    public void testClusterSnapshotCheckWithExpiring() throws Exception {
+        IgniteEx ignite = startGrids(3);
+
+        ignite.cluster().state(ACTIVE);
+
+        IgniteCache<Object, Object> cache = ignite.getOrCreateCache(new CacheConfiguration<>("expCache")
+            .setAffinity(new RendezvousAffinityFunction(false, 32)).setBackups(1));
+
+        Random rnd = new Random();
+
+        for (int i = 0; i < 10_000; i++) {
+            cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS,
+                rnd.nextInt(10_000)))).put(i, i);
+        }
+
+        long timeout = getTestTimeout();
+
+        snp(ignite).createSnapshot(SNAPSHOT_NAME).get(timeout);
+
+        SnapshotPartitionsVerifyTaskResult res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get(timeout);
+
+        assertFalse(res.idleVerifyResult().hasConflicts());
+    }
+
     /**
      * @param cls Class of running task.
      * @param results Results of compute.