You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/03/16 21:06:43 UTC

[ignite] branch master updated: IGNITE-14305 Fix snapshot check command for indexes (#8874)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b4e6ee  IGNITE-14305 Fix snapshot check command for indexes (#8874)
2b4e6ee is described below

commit 2b4e6ee63d475840e160298b04e967e0d71bd577
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Wed Mar 17 00:06:18 2021 +0300

    IGNITE-14305 Fix snapshot check command for indexes (#8874)
---
 .../persistence/file/FilePageStoreManager.java     |   2 +-
 .../persistence/snapshot/SnapshotFutureTask.java   |  20 +++-
 .../persistence/snapshot/SnapshotMetadata.java     |   5 +-
 .../snapshot/SnapshotPartitionsVerifyTask.java     |   9 ++
 .../snapshot/AbstractSnapshotSelfTest.java         |   3 +
 .../IgniteClusterSnapshotCheckWithIndexesTest.java | 127 +++++++++++++++++++++
 .../testsuites/IgnitePdsWithIndexingTestSuite.java |   2 +
 7 files changed, 159 insertions(+), 9 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 7110dcd..a7c682d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -1043,7 +1043,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
 
         return Arrays.stream(files)
             .filter(File::isFile)
-            .filter(f -> f.getName().startsWith(PART_FILE_PREFIX))
+            .filter(f -> f.getName().endsWith(FILE_SUFFIX))
             .collect(Collectors.toList());
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 89484f7..978d146 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryType;
@@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -389,19 +389,19 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
                 int grpId = e.getKey();
                 Set<Integer> grpParts = e.getValue();
 
-                GridDhtPartitionTopology top = cctx.cache().cacheGroup(grpId).topology();
+                CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
 
                 Iterator<GridDhtLocalPartition> iter;
 
                 if (grpParts == null)
-                    iter = top.currentLocalPartitions().iterator();
+                    iter = gctx.topology().currentLocalPartitions().iterator();
                 else {
                     if (grpParts.contains(INDEX_PARTITION)) {
                         throw new IgniteCheckedException("Index partition cannot be included into snapshot if " +
                             " set of cache group partitions has been explicitly provided [grpId=" + grpId + ']');
                     }
 
-                    iter = F.iterator(grpParts, top::localPartition, false);
+                    iter = F.iterator(grpParts, gctx.topology()::localPartition, false);
                 }
 
                 Set<Integer> owning = new HashSet<>();
@@ -420,6 +420,8 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
                         missed.add(part.id());
                 }
 
+                boolean affNode = gctx.nodeFilter() == null || gctx.nodeFilter().apply(cctx.localNode());
+
                 if (grpParts != null) {
                     // Partition has been provided for cache group, but some of them are not in OWNING state.
                     // Exit with an error.
@@ -437,7 +439,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
                             "Partitions which have different states skipped. Index partitions has also been skipped " +
                             "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + missed + ']');
                     }
-                    else if (missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
+                    else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
                         owning.add(INDEX_PARTITION);
                 }
 
@@ -620,7 +622,13 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
         if (closeFut == null) {
             Throwable err0 = err.get();
 
-            closeFut = CompletableFuture.runAsync(() -> onDone(partFileLengths.keySet(), err0),
+            // Zero partitions haven't to be written on disk.
+            Set<GroupPartitionId> taken = partFileLengths.entrySet().stream()
+                .filter(e -> e.getValue() > 0)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+
+            closeFut = CompletableFuture.runAsync(() -> onDone(taken, err0),
                 cctx.kernalContext().getSystemExecutorService());
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index d2a8918..f5fb6cba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -137,7 +137,7 @@ public class SnapshotMetadata implements Serializable {
     }
 
     /**
-     * @return The list of cache groups ids which were included into snapshot.
+     * @return The list of cache group IDs which were included into the snapshot globally.
      */
     public List<Integer> cacheGroupIds() {
         return grpIds;
@@ -151,7 +151,8 @@ public class SnapshotMetadata implements Serializable {
     }
 
     /**
-     * @return Map of cache group partitions from which snapshot has been taken on local node.
+     * @return Map of cache group partitions from which snapshot has been taken on the local node (which is actually
+     * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
         return locParts;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index e717a45..42ef810 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -57,6 +57,8 @@ import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
@@ -64,6 +66,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
 import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
 import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
 
 /**
  * Task for checking snapshot partitions consistency the same way as {@link VerifyBackupPartitionsTaskV2} does.
@@ -233,6 +236,12 @@ public class SnapshotPartitionsVerifyTask
                                     val -> {
                                     })
                             ) {
+                                if (partId == INDEX_PARTITION) {
+                                    checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX);
+
+                                    return null;
+                                }
+
                                 ByteBuffer pageBuff = buff.get();
                                 pageBuff.clear();
                                 pageStore.read(0, pageBuff, true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 809a23d..74976b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -104,6 +104,9 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
     /** Number of cache keys to pre-create at node start. */
     protected static final int CACHE_KEYS_RANGE = 1024;
 
+    /** Timeout in milliseconds to await for snapshot operation being completed. */
+    protected static final int SNAPSHOT_AWAIT_TIMEOUT_MS = 15_000;
+
     /** List of collected snapshot test events. */
     protected final List<Integer> locEvts = new CopyOnWriteArrayList<>();
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
new file mode 100644
index 0000000..e0aee42e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckWithIndexesTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.junit.Test;
+
+import static java.util.Collections.singletonList;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+
+/**
+ * Cluster-wide snapshot test check command with indexes.
+ */
+public class IgniteClusterSnapshotCheckWithIndexesTest extends AbstractSnapshotSelfTest {
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckEmptyCache() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, 0, key -> new Account(key, key),
+            txFilteredCache("indexed"));
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS);
+
+        IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions()));
+        assertTrue(F.isEmpty(res.exceptions()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckWithIndexes() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, key -> new Account(key, key),
+            txFilteredCache("indexed"));
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS);
+
+        IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found.");
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testClusterSnapshotCheckWithNodeFilter() throws Exception {
+        startGridsWithoutCache(2);
+
+        IgniteCache<Integer, Account> cache1 = grid(0).createCache(txFilteredCache("cache0")
+            .setNodeFilter(new SelfNodeFilter(grid(0).localNode().id())));
+        IgniteCache<Integer, Account> cache2 = grid(1).createCache(txFilteredCache("cache1")
+            .setNodeFilter(new SelfNodeFilter(grid(1).localNode().id())));
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++) {
+            cache1.put(i, new Account(i, i));
+            cache2.put(i, new Account(i, i));
+        }
+
+        grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(SNAPSHOT_AWAIT_TIMEOUT_MS);
+
+        IdleVerifyResultV2 res = grid(0).context().cache().context().snapshotMgr().checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found.");
+    }
+
+    /** Node filter to run cache on single node. */
+    private static class SelfNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Node id to run cache at. */
+        private final UUID nodeId;
+
+        /** @param nodeId Node id to run cache at. */
+        public SelfNodeFilter(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.id().equals(nodeId);
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Cache configuration.
+     */
+    private static CacheConfiguration<Integer, Account> txFilteredCache(String cacheName) {
+        return txCacheConfig(new CacheConfiguration<Integer, Account>(cacheName))
+            .setCacheMode(CacheMode.REPLICATED)
+            .setQueryEntities(singletonList(new QueryEntity(Integer.class.getName(), Account.class.getName())));
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index d018457..239d293 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotIni
 import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.MultipleParallelCacheDeleteDeadlockTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
 import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
@@ -57,6 +58,7 @@ import org.junit.runners.Suite;
     RebuildIndexTest.class,
     RebuildIndexWithMVCCTest.class,
     IgniteClusterSnapshotWithIndexesTest.class,
+    IgniteClusterSnapshotCheckWithIndexesTest.class,
     ClientReconnectWithSqlTableConfiguredTest.class,
     MultipleParallelCacheDeleteDeadlockTest.class,
     CacheGroupReencryptionTest.class,