You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/03/16 21:06:43 UTC
[ignite] branch master updated: IGNITE-14305 Fix snapshot check
command for indexes (#8874)
This is an automated email from the ASF dual-hosted git repository.
mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2b4e6ee IGNITE-14305 Fix snapshot check command for indexes (#8874)
2b4e6ee is described below
commit 2b4e6ee63d475840e160298b04e967e0d71bd577
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Wed Mar 17 00:06:18 2021 +0300
IGNITE-14305 Fix snapshot check command for indexes (#8874)
---
.../persistence/file/FilePageStoreManager.java | 2 +-
.../persistence/snapshot/SnapshotFutureTask.java | 20 +++-
.../persistence/snapshot/SnapshotMetadata.java | 5 +-
.../snapshot/SnapshotPartitionsVerifyTask.java | 9 ++
.../snapshot/AbstractSnapshotSelfTest.java | 3 +
.../IgniteClusterSnapshotCheckWithIndexesTest.java | 127 +++++++++++++++++++++
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 2 +
7 files changed, 159 insertions(+), 9 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 7110dcd..a7c682d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -1043,7 +1043,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
return Arrays.stream(files)
.filter(File::isFile)
- .filter(f -> f.getName().startsWith(PART_FILE_PREFIX))
+ .filter(f -> f.getName().endsWith(FILE_SUFFIX))
.collect(Collectors.toList());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 89484f7..978d146 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryType;
@@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -389,19 +389,19 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
int grpId = e.getKey();
Set<Integer> grpParts = e.getValue();
- GridDhtPartitionTopology top = cctx.cache().cacheGroup(grpId).topology();
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
Iterator<GridDhtLocalPartition> iter;
if (grpParts == null)
- iter = top.currentLocalPartitions().iterator();
+ iter = gctx.topology().currentLocalPartitions().iterator();
else {
if (grpParts.contains(INDEX_PARTITION)) {
throw new IgniteCheckedException("Index partition cannot be included into snapshot if " +
" set of cache group partitions has been explicitly provided [grpId=" + grpId + ']');
}
- iter = F.iterator(grpParts, top::localPartition, false);
+ iter = F.iterator(grpParts, gctx.topology()::localPartition, false);
}
Set<Integer> owning = new HashSet<>();
@@ -420,6 +420,8 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
missed.add(part.id());
}
+ boolean affNode = gctx.nodeFilter() == null || gctx.nodeFilter().apply(cctx.localNode());
+
if (grpParts != null) {
// Partition has been provided for cache group, but some of them are not in OWNING state.
// Exit with an error.
@@ -437,7 +439,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
"Partitions which have different states skipped. Index partitions has also been skipped " +
"[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + missed + ']');
}
- else if (missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
+ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
owning.add(INDEX_PARTITION);
}
@@ -620,7 +622,13 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
if (closeFut == null) {
Throwable err0 = err.get();
- closeFut = CompletableFuture.runAsync(() -> onDone(partFileLengths.keySet(), err0),
+ // Zero partitions haven't to be written on disk.
+ Set<GroupPartitionId> taken = partFileLengths.entrySet().stream()
+ .filter(e -> e.getValue() > 0)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ closeFut = CompletableFuture.runAsync(() -> onDone(taken, err0),
cctx.kernalContext().getSystemExecutorService());
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index d2a8918..f5fb6cba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -137,7 +137,7 @@ public class SnapshotMetadata implements Serializable {
}
/**
- * @return The list of cache groups ids which were included into snapshot.
+ * @return The list of cache group IDs which were included into the snapshot globally.
*/
public List<Integer> cacheGroupIds() {
return grpIds;
@@ -151,7 +151,8 @@ public class SnapshotMetadata implements Serializable {
}
/**
- * @return Map of cache group partitions from which snapshot has been taken on local node.
+ * @return Map of cache group partitions from which snapshot has been taken on the local node (which is actually
+ * saved on the local node because some of them may be skipped due to cache node filter).
*/
public Map<Integer, Set<Integer>> partitions() {
return locParts;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index e717a45..42ef810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -57,6 +57,8 @@ import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
@@ -64,6 +66,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
/**
* Task for checking snapshot partitions consistency the same way as {@link VerifyBackupPartitionsTaskV2} does.
@@ -233,6 +236,12 @@ public class SnapshotPartitionsVerifyTask
val -> {
})
) {
+ if (partId == INDEX_PARTITION) {
+ checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX);
+
+ return null;
+ }
+
ByteBuffer pageBuff = buff.get();
pageBuff.clear();
pageStore.read(0, pageBuff, true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 809a23d..74976b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -104,6 +104,9 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
/** Number of cache keys to pre-create at node start. */
protected static final int CACHE_KEYS_RANGE = 1024;
+ /** Timeout in milliseconds to await for snapshot operation being completed. */
+ protected static final int SNAPSHOT_AWAIT_TIMEOUT_MS = 15_000;
+
/** List of collected snapshot test events. */
protected final List<Integer> locEvts = new CopyOnWriteArrayList<>();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
new file mode 100644
index 0000000..e0aee42e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+
+/**
+ * Cluster-wide snapshot test check command with indexes.
+ */
+public class IgniteClusterSnapshotCheckWithIndexesTest extends AbstractSnapshotSelfTest {
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckEmptyCache() throws Exception {
+ IgniteEx ignite = startGridsWithCache(3, 0, key -> new Account(key, key),
+ txFilteredCache("indexed"));
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS);
+
+ IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions()));
+ assertTrue(F.isEmpty(res.exceptions()));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testClusterSnapshotCheckWithIndexes() throws Exception {
+ IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, key -> new Account(key, key),
+ txFilteredCache("indexed"));
+
+ ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS);
+
+ IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found.");
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testClusterSnapshotCheckWithNodeFilter() throws Exception {
+ startGridsWithoutCache(2);
+
+ IgniteCache<Integer, Account> cache1 = grid(0).createCache(txFilteredCache("cache0")
+ .setNodeFilter(new SelfNodeFilter(grid(0).localNode().id())));
+ IgniteCache<Integer, Account> cache2 = grid(1).createCache(txFilteredCache("cache1")
+ .setNodeFilter(new SelfNodeFilter(grid(1).localNode().id())));
+
+ for (int i = 0; i < CACHE_KEYS_RANGE; i++) {
+ cache1.put(i, new Account(i, i));
+ cache2.put(i, new Account(i, i));
+ }
+
+ grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS);
+
+ IdleVerifyResultV2 res = grid(0).context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get();
+
+ StringBuilder b = new StringBuilder();
+ res.print(b::append, true);
+
+ assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions()));
+ assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found.");
+ }
+
+ /** Node filter to run cache on single node. */
+ private static class SelfNodeFilter implements IgnitePredicate<ClusterNode> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Node id to run cache at. */
+ private final UUID nodeId;
+
+ /** @param nodeId Node id to run cache at. */
+ public SelfNodeFilter(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return node.id().equals(nodeId);
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ */
+ private static CacheConfiguration<Integer, Account> txFilteredCache(String cacheName) {
+ return txCacheConfig(new CacheConfiguration<Integer, Account>(cacheName))
+ .setCacheMode(CacheMode.REPLICATED)
+ .setQueryEntities(singletonList(new QueryEntity(Integer.class.getName(), Account.class.getName())));
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index d018457..239d293 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotIni
import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
import org.apache.ignite.internal.processors.cache.persistence.db.MultipleParallelCacheDeleteDeadlockTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
@@ -57,6 +58,7 @@ import org.junit.runners.Suite;
RebuildIndexTest.class,
RebuildIndexWithMVCCTest.class,
IgniteClusterSnapshotWithIndexesTest.class,
+ IgniteClusterSnapshotCheckWithIndexesTest.class,
ClientReconnectWithSqlTableConfiguredTest.class,
MultipleParallelCacheDeleteDeadlockTest.class,
CacheGroupReencryptionTest.class,