You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2023/09/26 11:19:02 UTC
[ignite] branch master updated: IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes #10947.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 158bd2c0d65 IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes #10947.
158bd2c0d65 is described below
commit 158bd2c0d6546d01e55e541dc1b9a3156713d183
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Sep 26 14:16:39 2023 +0300
IGNITE-20462 Fix idle_verify hash conflicts for expiring entries - Fixes #10947.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../apache/ignite/util/GridCommandHandlerTest.java | 36 ++++++++++++++++++
.../SnapshotPartitionsQuickVerifyHandler.java | 3 +-
.../snapshot/SnapshotPartitionsVerifyHandler.java | 43 +++++++++++++++++++++-
.../processors/cache/verify/IdleVerifyUtility.java | 3 ++
.../cache/verify/PartitionHashRecordV2.java | 16 ++++++++
.../snapshot/IgniteClusterSnapshotCheckTest.java | 30 +++++++++++++++
6 files changed, 129 insertions(+), 2 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index abb7a50c4fb..6a40bfb738e 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -52,6 +53,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
@@ -2393,6 +2397,38 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
assertContains(log, testOut.toString(), "MOVING partitions");
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCacheIdleVerifyExpiringEntries() throws Exception {
+ IgniteEx ignite = startGrids(3);
+
+ ignite.cluster().state(ACTIVE);
+
+ IgniteCache<Object, Object> cache = ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 32))
+ .setBackups(1));
+
+ Random rnd = new Random();
+
+ // Put without expiry policy.
+ for (int i = 0; i < 5_000; i++)
+ cache.put(i, i);
+
+ // Put with expiry policy.
+ for (int i = 5_000; i < 10_000; i++) {
+ ExpiryPolicy expPol = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, rnd.nextInt(1_000)));
+ cache.withExpiryPolicy(expPol).put(i, i);
+ }
+
+ injectTestSystemOut();
+
+ assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
+
+ assertContains(log, testOut.toString(), "no conflicts have been found");
+ }
+
/** */
@Test
public void testCacheSequence() throws Exception {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
index 8bc5166e8d3..e378dec9b35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsQuickVerifyHandler.java
@@ -89,7 +89,8 @@ public class SnapshotPartitionsQuickVerifyHandler extends SnapshotPartitionsVeri
if (other == null)
return;
- if (val.size() != other.size() || !Objects.equals(val.updateCounter(), other.updateCounter()))
+ if ((!val.hasExpiringEntries() && !other.hasExpiringEntries() && val.size() != other.size())
+ || !Objects.equals(val.updateCounter(), other.updateCounter()))
wrnGrps.add(part.groupId());
}));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index 3547d5edf30..e24a51c6b08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -45,12 +45,15 @@ import org.apache.ignite.internal.management.cache.PartitionKeyV2;
import org.apache.ignite.internal.managers.encryption.EncryptionCacheKeyProvider;
import org.apache.ignite.internal.managers.encryption.GroupKey;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
+import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
@@ -102,7 +105,7 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
/** {@inheritDoc} */
@Override public Map<PartitionKeyV2, PartitionHashRecordV2> invoke(SnapshotHandlerContext opCtx) throws IgniteCheckedException {
if (!opCtx.snapshotDirectory().exists())
- throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());;
+ throw new IgniteCheckedException("Snapshot directory doesn't exists: " + opCtx.snapshotDirectory());
SnapshotMetadata meta = opCtx.metadata();
@@ -271,6 +274,10 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
assert hash != null : "OWNING must have hash: " + key;
+ // We should skip size comparison if there are entries to expire exist.
+ if (hasExpiringEntries(snpCtx, pageStore, pageBuff, io.getPendingTreeRoot(pageAddr)))
+ hash.hasExpiringEntries(true);
+
res.put(key, hash);
}
catch (IOException e) {
@@ -294,6 +301,40 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
return res;
}
+ /** */
+ private boolean hasExpiringEntries(
+ GridKernalContext ctx,
+ PageStore pageStore,
+ ByteBuffer pageBuff,
+ long pendingTreeMetaId
+ ) throws IgniteCheckedException {
+ if (pendingTreeMetaId == 0)
+ return false;
+
+ long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+ pageBuff.clear();
+ pageStore.read(pendingTreeMetaId, pageBuff, true);
+
+ if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
+ ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());
+
+ BPlusMetaIO treeIO = BPlusMetaIO.VERSIONS.forPage(pageAddr);
+
+ int rootLvl = treeIO.getRootLevel(pageAddr);
+ long rootId = treeIO.getFirstPageId(pageAddr, rootLvl);
+
+ pageBuff.clear();
+ pageStore.read(rootId, pageBuff, true);
+
+ if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE)
+ ctx.compress().decompressPage(pageBuff, pageStore.getPageSize());
+
+ BPlusIO<?> rootIO = PageIO.getPageIO(pageBuff);
+
+ return rootIO.getCount(pageAddr) != 0;
+ }
+
/** {@inheritDoc} */
@Override public void complete(String name,
Collection<SnapshotHandlerResult<Map<PartitionKeyV2, PartitionHashRecordV2>>> results) throws IgniteCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
index b93e1cf1394..79c1806d085 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
@@ -304,6 +304,9 @@ public class IdleVerifyUtility {
while (it.hasNextX()) {
CacheDataRow row = it.nextX();
+ if (row.expireTime() > 0)
+ continue;
+
partHash += row.key().hashCode();
partVerHash += row.version().hashCode(); // Detects ABA problem.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
index df7c98452d1..852fe856d3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
@@ -95,6 +95,10 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
@GridToStringExclude
private int regKeys;
+ /** If partition has entries to expire. */
+ @GridToStringExclude
+ private boolean hasExpiringEntries;
+
/**
* @param partKey Partition key.
* @param isPrimary Is primary.
@@ -219,6 +223,16 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
return regKeys;
}
+ /** */
+ public boolean hasExpiringEntries() {
+ return hasExpiringEntries;
+ }
+
+ /** */
+ public void hasExpiringEntries(boolean hasExpiringEntries) {
+ this.hasExpiringEntries = hasExpiringEntries;
+ }
+
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
out.writeObject(partKey);
@@ -233,6 +247,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
out.writeInt(noCfKeys);
out.writeInt(binKeys);
out.writeInt(regKeys);
+ out.writeBoolean(hasExpiringEntries);
}
/** {@inheritDoc} */
@@ -255,6 +270,7 @@ public class PartitionHashRecordV2 extends VisorDataTransferObject {
noCfKeys = in.readInt();
binKeys = in.readInt();
regKeys = in.readInt();
+ hasExpiringEntries = in.readBoolean();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index f9286a2510d..c3e811d8e71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -36,7 +36,11 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -600,6 +604,32 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
assertTrue("Threads created: " + createdThreads, createdThreads < iterations);
}
+ /** */
+ @Test
+ public void testClusterSnapshotCheckWithExpiring() throws Exception {
+ IgniteEx ignite = startGrids(3);
+
+ ignite.cluster().state(ACTIVE);
+
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache(new CacheConfiguration<>("expCache")
+ .setAffinity(new RendezvousAffinityFunction(false, 32)).setBackups(1));
+
+ Random rnd = new Random();
+
+ for (int i = 0; i < 10_000; i++) {
+ cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS,
+ rnd.nextInt(10_000)))).put(i, i);
+ }
+
+ long timeout = getTestTimeout();
+
+ snp(ignite).createSnapshot(SNAPSHOT_NAME).get(timeout);
+
+ SnapshotPartitionsVerifyTaskResult res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, null).get(timeout);
+
+ assertFalse(res.idleVerifyResult().hasConflicts());
+ }
+
/**
* @param cls Class of running task.
* @param results Results of compute.