You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/01/27 15:18:54 UTC

[GitHub] [ignite] Mmuzaf opened a new pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Mmuzaf opened a new pull request #8715:
URL: https://github.com/apache/ignite/pull/8715


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r582905105



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();
+        clusterMetas.values().forEach(allParts::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allParts) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allParts.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                SnapshotMetadata meta = e.getValue().get(idx);

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578568873



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578564615



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {
+        if (!smf.exists())
+            throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+        String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
+
+        try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
+            SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cfg));
+
+            assert U.maskForFileName(meta.consistentId()).equals(smfName) :
+                "smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId());
+
+            return meta;
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException("An error occurred during reading snapshot metadata file [file=" +
+                smf.getAbsolutePath() + "]", e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return List of snapshot metadata for the given snapshot name on local node.
+     * If snapshot has been taken from local node the snapshot metadata for given
+     * local node will be placed on the first place.
+     */
+    public List<SnapshotMetadata> readSnapshotMetadatas(String snpName) {
+        A.notNullOrEmpty(snpName, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(snpName), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        File[] smfs = snapshotLocalDir(snpName).listFiles((dir, name) ->
+            name.toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT));
+
+        if (smfs == null)
+            throw new IgniteException("Snapshot directory doesn't exists or an I/O error occurred during directory read.");
+
+        Map<String, SnapshotMetadata> metasMap = new HashMap<>();
+        SnapshotMetadata prev = null;
+
+        for (File smf : smfs) {
+            SnapshotMetadata curr = readSnapshotMetadata(smf, marsh, cctx.gridConfig());
+
+            assert prev == null || sameSnapshotMetadata(prev, curr) : "prev=" + prev + ", curr=" + curr;

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r577543086



##########
File path: modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cache/IdleVerify.java
##########
@@ -322,8 +336,32 @@ private void cacheIdleVerifyV2(
         IdleVerifyResultV2 res = executeTask(client, VisorIdleVerifyTaskV2.class, taskArg, clientCfg);
 
         logParsedArgs(taskArg, System.out::print);
+        res.print(System.out::print, false);
+
+        if (F.isEmpty(res.exceptions()))
+            return;
 
-        res.print(System.out::print);
+        try {
+            File f = new File(U.resolveWorkDirectory(U.defaultWorkDirectory(), "", false),
+                IDLE_VERIFY_FILE_PREFIX + LocalDateTime.now().format(TIME_FORMATTER) + ".txt");
+
+            try (PrintWriter pw = new PrintWriter(f)) {
+                res.print(System.out::print, true);

Review comment:
       What for we are opening `pw` if result printed to `System.out`.
   BTW, you can minimize count of changed lines of code and count of errors like this if you make `IdleVerifyResultV2#print(Consumer<String> printer, boolean printExceptionMessages)` public for check snapshot command and left `IdleVerifyResultV2#print(Consumer<String> printer)` method as is for idle verify command.

##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
##########
@@ -3086,6 +3088,31 @@ public void testCancelSnapshot() throws Exception {
             snpName -> assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "cancel", snpName)));
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testCheckSnapshot() throws Exception {
+        String snpName = "snapshot_02052020";
+
+        IgniteEx ig = startGrid(0);
+        ig.cluster().state(ACTIVE);
+
+        createCacheAndPreload(ig, 1000);
+
+        snp(ig).createSnapshot(snpName)
+            .get();
+
+        CommandHandler h = new CommandHandler();
+
+        assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "check", snpName));
+
+        StringBuilder sb = new StringBuilder();
+
+        ((IdleVerifyResultV2)h.getLastOperationResult()).print(sb::append, true);
+
+        assertContains(log, sb.toString(), "The check procedure has finished, no conflicts have been found");
+    }
+
+

Review comment:
       Redundant NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
##########
@@ -1050,6 +1050,17 @@ else if (state.active()) {
         return bltNodes;
     }
 
+    /**
+     * @return Collection of available baseline nodes.
+     */
+    public Collection<UUID> onlineBaselineNodes() {

Review comment:
       1. Return value is collection of node ids, method name should reflect this fact (onlineBaselineNodeIds or something like that).
   2. I doubt that we need will need somewhere in future ids of online nodes (usually we need the collection of nodes), so, it was better when this code live in IgniteSnapshotManager (the only usage).  

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
##########
@@ -288,8 +245,6 @@ private void printSkippedPartitions(
 
                 printer.accept("Partition instances: " + entry.getValue() + "\n");
             }
-
-            printer.accept("\n");

Review comment:
       Why these separators are removed?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -999,6 +999,79 @@ else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         return ccfgs;
     }
 
+    /**
+     * @param dir Directory to check.
+     * @return Files that match cache or cache group pattern.
+     */
+    public static List<File> cacheDirectories(File dir) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(dir.listFiles())
+            .sorted()
+            .filter(File::isDirectory)
+            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @param partFileName Partition file name.
+     * @return Partition id.
+     */
+    public static int partId(String partFileName) {
+        if (partFileName.equals(INDEX_FILE_NAME))
+            return PageIdAllocator.INDEX_PARTITION;
+
+        if (partFileName.startsWith(PART_FILE_PREFIX))
+            return Integer.parseInt(partFileName.substring(PART_FILE_PREFIX.length(), partFileName.indexOf('.')));
+
+        throw new IllegalStateException("Illegal partition file name: " + partFileName);
+    }
+
+    /**
+     * @param cacheDir Cache directory to check.
+     * @return List of cache partitions in given directory.
+     */
+    public static List<File> cachePartitions(File cacheDir) {

Review comment:
       `cachePartitionFiles`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {
+        if (!smf.exists())
+            throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+        String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
+
+        try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
+            SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cfg));
+
+            assert U.maskForFileName(meta.consistentId()).equals(smfName) :

Review comment:
       File name of snapshot metadata formed using`pdsSettings.folderName()`, sometimes it's not equals to consistentId.
   This condition depends on the external environment (not only on internal code invariants), I think we should have a correct exception here instead of the assertion.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
##########
@@ -101,6 +110,27 @@ public long getUpdateCounter(long pageAddr) {
         return PageUtils.getLong(pageAddr, UPDATE_CNTR_OFF);
     }
 
+    /**
+     * @param buff Page buffer.
+     * @return Partition update counter.
+     */
+    public long getUpdateCounter(ByteBuffer buff) {
+        return buff.getLong(UPDATE_CNTR_OFF);
+    }
+
+    /**
+     * @param buff Page buffer.
+     * @return Partition update counter.
+     */
+    public boolean setUpdateCounter(ByteBuffer buff, long cntr) {

Review comment:
       I think all new duplicated methods in this class are redundant, especially setter, which is used only in the test. All you need is to get the address of the buffer `long pageAddr = GridUnsafe.bufferAddress(pageBuff);` and you can use standard methods.  

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory

Review comment:
       `pageStore` should be closed after use

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {
+        if (!smf.exists())
+            throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+        String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
+
+        try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
+            SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cfg));
+
+            assert U.maskForFileName(meta.consistentId()).equals(smfName) :
+                "smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId());
+
+            return meta;
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException("An error occurred during reading snapshot metadata file [file=" +
+                smf.getAbsolutePath() + "]", e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return List of snapshot metadata for the given snapshot name on local node.
+     * If snapshot has been taken from local node the snapshot metadata for given
+     * local node will be placed on the first place.
+     */
+    public List<SnapshotMetadata> readSnapshotMetadatas(String snpName) {
+        A.notNullOrEmpty(snpName, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(snpName), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        File[] smfs = snapshotLocalDir(snpName).listFiles((dir, name) ->
+            name.toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT));
+
+        if (smfs == null)
+            throw new IgniteException("Snapshot directory doesn't exists or an I/O error occurred during directory read.");
+
+        Map<String, SnapshotMetadata> metasMap = new HashMap<>();
+        SnapshotMetadata prev = null;
+
+        for (File smf : smfs) {
+            SnapshotMetadata curr = readSnapshotMetadata(smf, marsh, cctx.gridConfig());
+
+            assert prev == null || sameSnapshotMetadata(prev, curr) : "prev=" + prev + ", curr=" + curr;
+
+            metasMap.put(curr.consistentId(), curr);
+
+            prev = curr;
+        }
+
+        SnapshotMetadata currNodeSmf = metasMap.remove(cctx.localNode().consistentId().toString());
+
+        // Snapshot metadata for the local node must be first in the result map.
+        if (currNodeSmf == null)
+            return new ArrayList<>(metasMap.values());
+        else {
+            List<SnapshotMetadata> result = new ArrayList<>();
+
+            result.add(currNodeSmf);
+            result.addAll(metasMap.values());
+
+            return result;
+        }
+    }
+
+    /**
+     * @param meta1 First snapshot metadata.
+     * @param meta2 Second snapshot metadata.
+     * @return {@code true} if given metadata belongs to the same snapshot.
+     */
+    public static boolean sameSnapshotMetadata(SnapshotMetadata meta1, SnapshotMetadata meta2) {

Review comment:
       The method name is ambiguous and can be treated as `SnapshotMetadata.equals()`, perhaps it's better to move it to `SnapshotMetadata` class and rename to `sameSnapshot` or something like that

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();
+        clusterMetas.values().forEach(allParts::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allParts) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allParts.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                SnapshotMetadata meta = e.getValue().get(idx);
+
+                if (allParts.remove(meta)) {

Review comment:
       If we have a snapshot from 100 nodes and 10 nodes currently online, each node will get its own snapshot, and then the first node will get 90 other snapshots and another 9 nodes will be idle.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();
+        clusterMetas.values().forEach(allParts::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allParts) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allParts.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                SnapshotMetadata meta = e.getValue().get(idx);

Review comment:
       Index out of bounds here if `e.getValue().size() == idx`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,

Review comment:
       Pass update counter and partition size in such a way looks weird, perhaps this method should be inlined to VisorVerifySnapshotPartitionsJob#execute

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();
+        clusterMetas.values().forEach(allParts::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allParts) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allParts.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                SnapshotMetadata meta = e.getValue().get(idx);
+
+                if (allParts.remove(meta)) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.snapshotName(), meta.consistentId()),
+                        e.getKey());
+                }
+
+                if (allParts.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;
+
+        /** Consistent snapshot metadata file name. */
+        private String consId;
+
+        /**
+         * @param snpName Snapshot name to validate.
+         * @param consId Consistent snapshot metadata file name.
+         */
+        public VisorVerifySnapshotPartitionsJob(String snpName, String consId) {
+            this.snpName = snpName;
+            this.consId = consId;
+        }
+
+        @Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute() throws IgniteException {
+            IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
+
+            if (log.isInfoEnabled()) {
+                log.info("Verify snapshot partitions procedure has been initiated " +
+                    "[snpName=" + snpName + ", consId=" + consId + ']');
+            }
+
+            SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId);
+            Set<Integer> grps = new HashSet<>(meta.partitions().keySet());
+            Set<T2<File, File>> pairs = new HashSet<>();
+
+            for (File dir : snpMgr.snapshotCacheDirectories(snpName, consId)) {
+                int grpId = CU.cacheId(cacheGroupName(dir));
+
+                if (!grps.remove(grpId))
+                    continue;
+
+                Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));
+
+                for (File part : cachePartitions(dir)) {
+                    int partId = partId(part.getName());
+
+                    if (!parts.remove(partId))
+                        continue;
+
+                    pairs.add(new T2<>(dir, part));
+                }
+
+                if (!parts.isEmpty()) {
+                    throw new IgniteException("Snapshot data doesn't contain required cache group partition " +
+                        "[grpId=" + grpId + ", snpName=" + snpName + ", consId=" + consId +
+                        ", missed=" + parts + ", meta=" + meta + ']');
+                }
+            }
+
+            if (!grps.isEmpty()) {
+                throw new IgniteException("Snapshot data doesn't contain required cache groups " +
+                    "[grps=" + grps + ", snpName=" + snpName + ", consId=" + consId +
+                    ", meta=" + meta + ']');
+            }
+
+            Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+            ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize())
+                .order(ByteOrder.nativeOrder()));
+
+            try {
+                U.doInParallel(
+                    ignite.context().getSystemExecutorService(),
+                    pairs,
+                    pair -> {
+                        String grpName = pair.get1().getName();

Review comment:
       `get1()` is group dir, not group name. Looks like you don't need group dir here at all, you should store group name. Also, group dir can be obtained from partition file using `getParent()` method, it will be easier to read the code if there will be collection of part files, but not collection of some pairs.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -599,7 +657,9 @@ else if (!F.isEmpty(err) || !missed.isEmpty()) {
      * @return Future which will be completed when the snapshot will be finalized.
      */
     private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotEndStage(SnapshotOperationRequest req) {
-        if (clusterSnpReq == null)
+        SnapshotOperationRequest req0 = clusterSnpReq;

Review comment:
       Looks like redundant change

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Snapshot metadata file.
+ */
+public class SnapshotMetadata implements Serializable {

Review comment:
       Fields can be final, default constructor is redundant, setters are redundant.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -999,6 +999,79 @@ else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         return ccfgs;
     }
 
+    /**
+     * @param dir Directory to check.
+     * @return Files that match cache or cache group pattern.
+     */
+    public static List<File> cacheDirectories(File dir) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(dir.listFiles())
+            .sorted()
+            .filter(File::isDirectory)
+            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @param partFileName Partition file name.
+     * @return Partition id.
+     */
+    public static int partId(String partFileName) {
+        if (partFileName.equals(INDEX_FILE_NAME))
+            return PageIdAllocator.INDEX_PARTITION;
+
+        if (partFileName.startsWith(PART_FILE_PREFIX))
+            return Integer.parseInt(partFileName.substring(PART_FILE_PREFIX.length(), partFileName.indexOf('.')));
+
+        throw new IllegalStateException("Illegal partition file name: " + partFileName);
+    }
+
+    /**
+     * @param cacheDir Cache directory to check.
+     * @return List of cache partitions in given directory.
+     */
+    public static List<File> cachePartitions(File cacheDir) {
+        File[] files = cacheDir.listFiles();
+
+        if (files == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(files)
+            .filter(File::isFile)
+            .filter(f -> f.getName().startsWith(PART_FILE_PREFIX))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @param file Directory to check.
+     * @return {@code true} if given directory is shared.
+     * @throws IgniteException If given directory doesn't match the cache pattern.
+     */
+    public static boolean isSharedGroup(File file) {
+        String name = file.getName();
+
+        if (name.startsWith(CACHE_GRP_DIR_PREFIX))
+            return true;
+        else if (name.startsWith(CACHE_DIR_PREFIX))
+            return false;
+        else
+            throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + file);
+    }
+
+    /**
+     * @param dir Cache directory on disk.
+     * @return Cache or cache group name.
+     */
+    public static String cacheGroupName(File dir) {
+        return isSharedGroup(dir) ?
+            dir.getName().replaceFirst("^" + CACHE_GRP_DIR_PREFIX, "") :

Review comment:
       Regexp is overkill here, let's use substring, also it's only one usage of isSharedGroup, I think we don't need this method, something like this will be more readable:
   ```
           if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
               return dir.getName().substring(CACHE_GRP_DIR_PREFIX.length());
           else if (dir.getName().startsWith(CACHE_DIR_PREFIX))
               return dir.getName().substring(CACHE_DIR_PREFIX.length());
           else
               throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + dir);
   ``` 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),

Review comment:
       File name of snapshot metadata formed using`pdsSettings.folderName()`, sometimes it's not equals to consistentId.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {
+        if (!smf.exists())
+            throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+        String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
+
+        try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
+            SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cfg));
+
+            assert U.maskForFileName(meta.consistentId()).equals(smfName) :
+                "smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId());
+
+            return meta;
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException("An error occurred during reading snapshot metadata file [file=" +
+                smf.getAbsolutePath() + "]", e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return List of snapshot metadata for the given snapshot name on local node.
+     * If snapshot has been taken from local node the snapshot metadata for given
+     * local node will be placed on the first place.
+     */
+    public List<SnapshotMetadata> readSnapshotMetadatas(String snpName) {
+        A.notNullOrEmpty(snpName, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(snpName), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        File[] smfs = snapshotLocalDir(snpName).listFiles((dir, name) ->
+            name.toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT));
+
+        if (smfs == null)
+            throw new IgniteException("Snapshot directory doesn't exists or an I/O error occurred during directory read.");
+
+        Map<String, SnapshotMetadata> metasMap = new HashMap<>();
+        SnapshotMetadata prev = null;
+
+        for (File smf : smfs) {
+            SnapshotMetadata curr = readSnapshotMetadata(smf, marsh, cctx.gridConfig());
+
+            assert prev == null || sameSnapshotMetadata(prev, curr) : "prev=" + prev + ", curr=" + curr;

Review comment:
       This condition depends on the external environment (not only on internal code invariants), I think we should have a correct exception here instead of the assertion.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;

Review comment:
       The correct exception should be thrown here instead of assertion

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));

Review comment:
       Direcory name for snapshot data files formed using `pdsSettings.folderName()`, sometimes it's not equals to `consistentId`.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
##########
@@ -1166,6 +1182,201 @@ public void testClusterSnapshotInMemoryFail() throws Exception {
             "Snapshots on an in-memory clusters are not allowed.");
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheck() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertPartitionsSame(res);
+        assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedPart() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+        assertTrue(part0.toFile().delete());
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertFalse(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "Snapshot data doesn't contain required cache group partition");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedGroup() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        Path dir = Files.walk(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath())
+            .filter(d -> d.toFile().getName().equals(cacheDirName(dfltCacheCfg)))
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("Cache directory not found"));
+
+        assertTrue(dir.toString(), dir.toFile().exists());
+        assertTrue(U.delete(dir));
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertFalse(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "Snapshot data doesn't contain required cache groups");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedMeta() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        File[] smfs = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).listFiles((dir, name) ->
+            name.toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT));
+
+        assertNotNull(smfs);
+        assertTrue(smfs[0].toString(), smfs[0].exists());
+        assertTrue(U.delete(smfs[0]));
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertFalse(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "Some metadata is missing from the snapshot");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckWithNodeFilter() throws Exception {
+        IgniteEx ig0 = startGridsWithoutCache(3);
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++) {
+            ig0.getOrCreateCache(txCacheConfig(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+                .setNodeFilter(node -> node.consistentId().toString().endsWith("0"))).put(i, i);
+        }
+
+        ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        IdleVerifyResultV2 res = snp(ig0).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertPartitionsSame(res);
+        assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckPartitionCounters() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg.
+            setAffinity(new RendezvousAffinityFunction(false, 1)),
+            CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+
+        FilePageStore pageStore = (FilePageStore)((FilePageStoreManager)ignite.context().cache().context().pageStore())
+            .getPageStoreFactory(CU.cacheId(dfltCacheCfg.getName()), false)
+            .createPageStore(getTypeByPartId(0),
+                () -> part0,
+                val -> {
+                });
+
+        ByteBuffer buff = ByteBuffer.allocate(ignite.configuration().getDataStorageConfiguration().getPageSize())
+            .order(ByteOrder.nativeOrder());
+
+        buff.clear();
+        pageStore.read(0, buff, false);
+
+        PagePartitionMetaIO io = PageIO.getPageIO(buff);
+        io.setUpdateCounter(buff, CACHE_KEYS_RANGE * 2);
+
+        pageStore.beginRecover();
+
+        buff.flip();
+        pageStore.write(PageIO.getPageId(buff), buff, 0, true);
+        pageStore.finishRecover();
+
+        pageStore.close();
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        System.out.println(">>>>>> " + b);

Review comment:
       ?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();

Review comment:
       Rename to `allMetas`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {

Review comment:
       Looks like it's not required to be static for this method, both usages pass the same `marsh` and `cfg`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r582843103



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allMetas = new HashSet<>();
+        clusterMetas.values().forEach(allMetas::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allMetas) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allMetas.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                Optional<SnapshotMetadata> meta = e.getValue().stream()
+                    .filter(allMetas::contains)
+                    .findFirst();
+
+                if (meta.isPresent() && allMetas.remove(meta.get())) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.get().snapshotName(), meta.get().consistentId()),
+                        e.getKey());
+                }
+
+                if (allMetas.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;
+
+        /** Consistent snapshot metadata file name. */
+        private String consId;

Review comment:
       final

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
##########
@@ -19,7 +19,6 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-

Review comment:
       Nothing changed in this file except this line

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
##########
@@ -130,7 +131,7 @@
                 .setPageSize(4096))
             .setCacheConfiguration(dfltCacheCfg)
             .setClusterStateOnStart(INACTIVE)
-            .setIncludeEventTypes(EVTS_CLUSTER_SNAPSHOT)
+            .setIncludeEventTypes(EVTS_ALL)

Review comment:
       Do we really need all events? Looks like there no new events needed in new tests.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
##########
@@ -322,9 +275,9 @@ private void printConflicts(Consumer<String> printer) {
 
                 printer.accept("Partition instances: " + entry.getValue() + "\n");
             }
-
-            printer.accept("\n");
         }
+
+        printer.accept("\n");

Review comment:
       Lets keep output format as is (and for `counterConflicts()` too)

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allMetas = new HashSet<>();
+        clusterMetas.values().forEach(allMetas::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allMetas) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allMetas.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                Optional<SnapshotMetadata> meta = e.getValue().stream()
+                    .filter(allMetas::contains)
+                    .findFirst();
+
+                if (meta.isPresent() && allMetas.remove(meta.get())) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.get().snapshotName(), meta.get().consistentId()),
+                        e.getKey());
+                }
+
+                if (allMetas.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;

Review comment:
       final

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allMetas = new HashSet<>();
+        clusterMetas.values().forEach(allMetas::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allMetas) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allMetas.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                Optional<SnapshotMetadata> meta = e.getValue().stream()
+                    .filter(allMetas::contains)
+                    .findFirst();
+
+                if (meta.isPresent() && allMetas.remove(meta.get())) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.get().snapshotName(), meta.get().consistentId()),
+                        e.getKey());
+                }
+
+                if (allMetas.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;
+
+        /** Consistent snapshot metadata file name. */
+        private String consId;
+
+        /**
+         * @param snpName Snapshot name to validate.
+         * @param consId Consistent snapshot metadata file name.
+         */
+        public VisorVerifySnapshotPartitionsJob(String snpName, String consId) {
+            this.snpName = snpName;
+            this.consId = consId;
+        }
+
+        @Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute() throws IgniteException {
+            IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
+
+            if (log.isInfoEnabled()) {
+                log.info("Verify snapshot partitions procedure has been initiated " +
+                    "[snpName=" + snpName + ", consId=" + consId + ']');
+            }
+
+            SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId);
+            Set<Integer> grps = new HashSet<>(meta.partitions().keySet());
+            Set<File> partFiles = new HashSet<>();
+
+            for (File dir : snpMgr.snapshotCacheDirectories(snpName, meta.folderName())) {
+                int grpId = CU.cacheId(cacheGroupName(dir));
+
+                if (!grps.remove(grpId))
+                    continue;
+
+                Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));
+
+                for (File part : cachePartitionFiles(dir)) {
+                    int partId = partId(part.getName());
+
+                    if (!parts.remove(partId))
+                        continue;
+
+                    partFiles.add(part);
+                }
+
+                if (!parts.isEmpty()) {
+                    throw new IgniteException("Snapshot data doesn't contain required cache group partition " +
+                        "[grpId=" + grpId + ", snpName=" + snpName + ", consId=" + consId +
+                        ", missed=" + parts + ", meta=" + meta + ']');
+                }
+            }
+
+            if (!grps.isEmpty()) {
+                throw new IgniteException("Snapshot data doesn't contain required cache groups " +
+                    "[grps=" + grps + ", snpName=" + snpName + ", consId=" + consId +
+                    ", meta=" + meta + ']');
+            }
+
+            Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+            ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize())
+                .order(ByteOrder.nativeOrder()));
+
+            try {
+                U.doInParallel(
+                    ignite.context().getSystemExecutorService(),
+                    partFiles,
+                    part -> {
+                        String grpName = cacheGroupName(part.getParentFile());
+                        int grpId = CU.cacheId(grpName);
+                        int partId = partId(part.getName());
+
+                        FilePageStoreManager storeMgr = (FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+                        try {
+                            try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false)
+                                .createPageStore(getTypeByPartId(partId),
+                                    part::toPath,
+                                    val -> {
+                                    })
+                            ) {
+                                ByteBuffer pageBuff = buff.get();
+                                pageBuff.clear();
+                                pageStore.read(0, pageBuff, true);
+
+                                long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+                                PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+                                GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr));
+
+                                if (partState != OWNING)
+                                    throw new IgniteCheckedException("Snapshot partitions must be in OWNING state only: " + partState);

Review comment:
       Line is too long

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
##########
@@ -55,52 +57,42 @@
         "Cluster not idle. Modifications found in caches or groups: ";
 
     /**
-     * See {@link IdleVerifyUtility#checkPartitionsPageCrcSum(FilePageStore, CacheGroupContext, int, byte)}.
-     */
-    public static void checkPartitionsPageCrcSum(
-        @Nullable FilePageStoreManager pageStoreMgr,
-        CacheGroupContext grpCtx,
-        int partId,
-        byte pageType
-    ) throws IgniteCheckedException, GridNotIdleException {
-        if (!grpCtx.persistenceEnabled() || pageStoreMgr == null)
-            return;
-
-        FilePageStore pageStore = (FilePageStore)pageStoreMgr.getStore(grpCtx.groupId(), partId);
-
-        checkPartitionsPageCrcSum(pageStore, grpCtx, partId, pageType);
-    }
-
-    /**
-     * Checks CRC sum of pages with {@code pageType} page type stored in partiion with {@code partId} id and assosiated
-     * with cache group. <br/> Method could be invoked only on idle cluster!
+     * Checks CRC sum of pages with {@code pageType} page type stored in partition with {@code partId} id
+     * and associated with cache group.
      *
-     * @param pageStore Page store.
-     * @param grpCtx Passed cache group context.
+     * @param pageStoreSup Page store supplier.
      * @param partId Partition id.
      * @param pageType Page type. Possible types {@link PageIdAllocator#FLAG_DATA}, {@link PageIdAllocator#FLAG_IDX}
      *      and {@link PageIdAllocator#FLAG_AUX}.
-     * @throws IgniteCheckedException If reading page failed.
-     * @throws GridNotIdleException If cluster not idle.
      */
     public static void checkPartitionsPageCrcSum(
-        FilePageStore pageStore,
-        CacheGroupContext grpCtx,
+        IgniteThrowableSupplier<FilePageStore> pageStoreSup,

Review comment:
       Why do we need a supplier here? Looks like FilePageStore will be enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578554691



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -999,6 +999,79 @@ else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         return ccfgs;
     }
 
+    /**
+     * @param dir Directory to check.
+     * @return Files that match cache or cache group pattern.
+     */
+    public static List<File> cacheDirectories(File dir) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(dir.listFiles())
+            .sorted()
+            .filter(File::isDirectory)
+            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @param partFileName Partition file name.
+     * @return Partition id.
+     */
+    public static int partId(String partFileName) {
+        if (partFileName.equals(INDEX_FILE_NAME))
+            return PageIdAllocator.INDEX_PARTITION;
+
+        if (partFileName.startsWith(PART_FILE_PREFIX))
+            return Integer.parseInt(partFileName.substring(PART_FILE_PREFIX.length(), partFileName.indexOf('.')));
+
+        throw new IllegalStateException("Illegal partition file name: " + partFileName);
+    }
+
+    /**
+     * @param cacheDir Cache directory to check.
+     * @return List of cache partitions in given directory.
+     */
+    public static List<File> cachePartitions(File cacheDir) {

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -999,6 +999,79 @@ else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         return ccfgs;
     }
 
+    /**
+     * @param dir Directory to check.
+     * @return Files that match cache or cache group pattern.
+     */
+    public static List<File> cacheDirectories(File dir) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(dir.listFiles())
+            .sorted()
+            .filter(File::isDirectory)
+            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @param partFileName Partition file name.
+     * @return Partition id.
+     */
+    public static int partId(String partFileName) {
+        if (partFileName.equals(INDEX_FILE_NAME))
+            return PageIdAllocator.INDEX_PARTITION;
+
+        if (partFileName.startsWith(PART_FILE_PREFIX))
+            return Integer.parseInt(partFileName.substring(PART_FILE_PREFIX.length(), partFileName.indexOf('.')));
+
+        throw new IllegalStateException("Illegal partition file name: " + partFileName);
+    }
+
+    /**
+     * @param cacheDir Cache directory to check.
+     * @return List of cache partitions in given directory.
+     */
+    public static List<File> cachePartitions(File cacheDir) {
+        File[] files = cacheDir.listFiles();
+
+        if (files == null)
+            return Collections.emptyList();
+
+        return Arrays.stream(files)
+            .filter(File::isFile)
+            .filter(f -> f.getName().startsWith(PART_FILE_PREFIX))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * @param file Directory to check.
+     * @return {@code true} if given directory is shared.
+     * @throws IgniteException If given directory doesn't match the cache pattern.
+     */
+    public static boolean isSharedGroup(File file) {
+        String name = file.getName();
+
+        if (name.startsWith(CACHE_GRP_DIR_PREFIX))
+            return true;
+        else if (name.startsWith(CACHE_DIR_PREFIX))
+            return false;
+        else
+            throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + file);
+    }
+
+    /**
+     * @param dir Cache directory on disk.
+     * @return Cache or cache group name.
+     */
+    public static String cacheGroupName(File dir) {
+        return isSharedGroup(dir) ?
+            dir.getName().replaceFirst("^" + CACHE_GRP_DIR_PREFIX, "") :

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578574179



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,

Review comment:
       Fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
##########
@@ -1166,6 +1182,201 @@ public void testClusterSnapshotInMemoryFail() throws Exception {
             "Snapshots on an in-memory clusters are not allowed.");
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheck() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertPartitionsSame(res);
+        assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedPart() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+        assertTrue(part0.toFile().delete());
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertFalse(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "Snapshot data doesn't contain required cache group partition");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedGroup() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        Path dir = Files.walk(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath())
+            .filter(d -> d.toFile().getName().equals(cacheDirName(dfltCacheCfg)))
+            .findFirst()
+            .orElseThrow(() -> new RuntimeException("Cache directory not found"));
+
+        assertTrue(dir.toString(), dir.toFile().exists());
+        assertTrue(U.delete(dir));
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertFalse(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "Snapshot data doesn't contain required cache groups");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckMissedMeta() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        File[] smfs = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).listFiles((dir, name) ->
+            name.toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT));
+
+        assertNotNull(smfs);
+        assertTrue(smfs[0].toString(), smfs[0].exists());
+        assertTrue(U.delete(smfs[0]));
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertFalse(F.isEmpty(res.exceptions()));
+        assertContains(log, b.toString(), "Some metadata is missing from the snapshot");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckWithNodeFilter() throws Exception {
+        IgniteEx ig0 = startGridsWithoutCache(3);
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++) {
+            ig0.getOrCreateCache(txCacheConfig(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+                .setNodeFilter(node -> node.consistentId().toString().endsWith("0"))).put(i, i);
+        }
+
+        ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        IdleVerifyResultV2 res = snp(ig0).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        assertTrue(F.isEmpty(res.exceptions()));
+        assertPartitionsSame(res);
+        assertContains(log, b.toString(), "The check procedure has finished, no conflicts have been found");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCheckPartitionCounters() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg.
+            setAffinity(new RendezvousAffinityFunction(false, 1)),
+            CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        Path part0 = U.searchFileRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(),
+            getPartitionFileName(0));
+
+        assertNotNull(part0);
+        assertTrue(part0.toString(), part0.toFile().exists());
+
+        FilePageStore pageStore = (FilePageStore)((FilePageStoreManager)ignite.context().cache().context().pageStore())
+            .getPageStoreFactory(CU.cacheId(dfltCacheCfg.getName()), false)
+            .createPageStore(getTypeByPartId(0),
+                () -> part0,
+                val -> {
+                });
+
+        ByteBuffer buff = ByteBuffer.allocate(ignite.configuration().getDataStorageConfiguration().getPageSize())
+            .order(ByteOrder.nativeOrder());
+
+        buff.clear();
+        pageStore.read(0, buff, false);
+
+        PagePartitionMetaIO io = PageIO.getPageIO(buff);
+        io.setUpdateCounter(buff, CACHE_KEYS_RANGE * 2);
+
+        pageStore.beginRecover();
+
+        buff.flip();
+        pageStore.write(PageIO.getPageId(buff), buff, 0, true);
+        pageStore.finishRecover();
+
+        pageStore.close();
+
+        IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME).get();
+
+        StringBuilder b = new StringBuilder();
+        res.print(b::append, true);
+
+        System.out.println(">>>>>> " + b);

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578554456



##########
File path: modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
##########
@@ -3086,6 +3088,31 @@ public void testCancelSnapshot() throws Exception {
             snpName -> assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "cancel", snpName)));
     }
 
+    /** @throws Exception If fails. */
+    @Test
+    public void testCheckSnapshot() throws Exception {
+        String snpName = "snapshot_02052020";
+
+        IgniteEx ig = startGrid(0);
+        ig.cluster().state(ACTIVE);
+
+        createCacheAndPreload(ig, 1000);
+
+        snp(ig).createSnapshot(snpName)
+            .get();
+
+        CommandHandler h = new CommandHandler();
+
+        assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "check", snpName));
+
+        StringBuilder sb = new StringBuilder();
+
+        ((IdleVerifyResultV2)h.getLastOperationResult()).print(sb::append, true);
+
+        assertContains(log, sb.toString(), "The check procedure has finished, no conflicts have been found");
+    }
+
+

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
##########
@@ -288,8 +245,6 @@ private void printSkippedPartitions(
 
                 printer.accept("Partition instances: " + entry.getValue() + "\n");
             }
-
-            printer.accept("\n");

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
##########
@@ -1050,6 +1050,17 @@ else if (state.active()) {
         return bltNodes;
     }
 
+    /**
+     * @return Collection of available baseline nodes.
+     */
+    public Collection<UUID> onlineBaselineNodes() {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r582905197



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();
+        clusterMetas.values().forEach(allParts::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allParts) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allParts.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                SnapshotMetadata meta = e.getValue().get(idx);
+
+                if (allParts.remove(meta)) {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578554380



##########
File path: modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/cache/IdleVerify.java
##########
@@ -322,8 +336,32 @@ private void cacheIdleVerifyV2(
         IdleVerifyResultV2 res = executeTask(client, VisorIdleVerifyTaskV2.class, taskArg, clientCfg);
 
         logParsedArgs(taskArg, System.out::print);
+        res.print(System.out::print, false);
+
+        if (F.isEmpty(res.exceptions()))
+            return;
 
-        res.print(System.out::print);
+        try {
+            File f = new File(U.resolveWorkDirectory(U.defaultWorkDirectory(), "", false),
+                IDLE_VERIFY_FILE_PREFIX + LocalDateTime.now().format(TIME_FORMATTER) + ".txt");
+
+            try (PrintWriter pw = new PrintWriter(f)) {
+                res.print(System.out::print, true);

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578640684



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PagePartitionMetaIO.java
##########
@@ -101,6 +110,27 @@ public long getUpdateCounter(long pageAddr) {
         return PageUtils.getLong(pageAddr, UPDATE_CNTR_OFF);
     }
 
+    /**
+     * @param buff Page buffer.
+     * @return Partition update counter.
+     */
+    public long getUpdateCounter(ByteBuffer buff) {
+        return buff.getLong(UPDATE_CNTR_OFF);
+    }
+
+    /**
+     * @param buff Page buffer.
+     * @return Partition update counter.
+     */
+    public boolean setUpdateCounter(ByteBuffer buff, long cntr) {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf merged pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf merged pull request #8715:
URL: https://github.com/apache/ignite/pull/8715


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578631719



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578554926



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -599,7 +657,9 @@ else if (!F.isEmpty(err) || !missed.isEmpty()) {
      * @return Future which will be completed when the snapshot will be finalized.
      */
     private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotEndStage(SnapshotOperationRequest req) {
-        if (clusterSnpReq == null)
+        SnapshotOperationRequest req0 = clusterSnpReq;

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578564358



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {
+        if (!smf.exists())
+            throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+        String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
+
+        try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
+            SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cfg));
+
+            assert U.maskForFileName(meta.consistentId()).equals(smfName) :

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Snapshot metadata file.
+ */
+public class SnapshotMetadata implements Serializable {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578584703



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();
+        clusterMetas.values().forEach(allParts::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allParts) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allParts.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                SnapshotMetadata meta = e.getValue().get(idx);
+
+                if (allParts.remove(meta)) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.snapshotName(), meta.consistentId()),
+                        e.getKey());
+                }
+
+                if (allParts.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;
+
+        /** Consistent snapshot metadata file name. */
+        private String consId;
+
+        /**
+         * @param snpName Snapshot name to validate.
+         * @param consId Consistent snapshot metadata file name.
+         */
+        public VisorVerifySnapshotPartitionsJob(String snpName, String consId) {
+            this.snpName = snpName;
+            this.consId = consId;
+        }
+
+        @Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute() throws IgniteException {
+            IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
+
+            if (log.isInfoEnabled()) {
+                log.info("Verify snapshot partitions procedure has been initiated " +
+                    "[snpName=" + snpName + ", consId=" + consId + ']');
+            }
+
+            SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId);
+            Set<Integer> grps = new HashSet<>(meta.partitions().keySet());
+            Set<T2<File, File>> pairs = new HashSet<>();
+
+            for (File dir : snpMgr.snapshotCacheDirectories(snpName, consId)) {
+                int grpId = CU.cacheId(cacheGroupName(dir));
+
+                if (!grps.remove(grpId))
+                    continue;
+
+                Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));
+
+                for (File part : cachePartitions(dir)) {
+                    int partId = partId(part.getName());
+
+                    if (!parts.remove(partId))
+                        continue;
+
+                    pairs.add(new T2<>(dir, part));
+                }
+
+                if (!parts.isEmpty()) {
+                    throw new IgniteException("Snapshot data doesn't contain required cache group partition " +
+                        "[grpId=" + grpId + ", snpName=" + snpName + ", consId=" + consId +
+                        ", missed=" + parts + ", meta=" + meta + ']');
+                }
+            }
+
+            if (!grps.isEmpty()) {
+                throw new IgniteException("Snapshot data doesn't contain required cache groups " +
+                    "[grps=" + grps + ", snpName=" + snpName + ", consId=" + consId +
+                    ", meta=" + meta + ']');
+            }
+
+            Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+            ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize())
+                .order(ByteOrder.nativeOrder()));
+
+            try {
+                U.doInParallel(
+                    ignite.context().getSystemExecutorService(),
+                    pairs,
+                    pair -> {
+                        String grpName = pair.get1().getName();

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578567606



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578574961



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allParts = new HashSet<>();

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r582891511



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allMetas = new HashSet<>();
+        clusterMetas.values().forEach(allMetas::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allMetas) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allMetas.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                Optional<SnapshotMetadata> meta = e.getValue().stream()
+                    .filter(allMetas::contains)
+                    .findFirst();
+
+                if (meta.isPresent() && allMetas.remove(meta.get())) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.get().snapshotName(), meta.get().consistentId()),
+                        e.getKey());
+                }
+
+                if (allMetas.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allMetas = new HashSet<>();
+        clusterMetas.values().forEach(allMetas::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allMetas) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allMetas.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                Optional<SnapshotMetadata> meta = e.getValue().stream()
+                    .filter(allMetas::contains)
+                    .findFirst();
+
+                if (meta.isPresent() && allMetas.remove(meta.get())) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.get().snapshotName(), meta.get().consistentId()),
+                        e.getKey());
+                }
+
+                if (allMetas.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;
+
+        /** Consistent snapshot metadata file name. */
+        private String consId;

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2;
+import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2;
+import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cachePartitionFiles;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getTypeByPartId;
+import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
+
+/** */
+@GridInternal
+public class SnapshotPartitionsVerifyTask
+    extends ComputeTaskAdapter<Map<ClusterNode, List<SnapshotMetadata>>, IdleVerifyResultV2> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private IgniteEx ignite;
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        @Nullable Map<ClusterNode, List<SnapshotMetadata>> clusterMetas
+    ) throws IgniteException {
+        if (!subgrid.containsAll(clusterMetas.keySet())) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some of Ignite nodes left the cluster during the snapshot verification " +
+                "[curr=" + F.viewReadOnly(subgrid, F.node2id()) +
+                ", init=" + F.viewReadOnly(clusterMetas.keySet(), F.node2id()) + ']')));
+        }
+
+        Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
+        Set<SnapshotMetadata> allMetas = new HashSet<>();
+        clusterMetas.values().forEach(allMetas::addAll);
+
+        Set<String> missed = null;
+
+        for (SnapshotMetadata meta : allMetas) {
+            if (missed == null)
+                missed = new HashSet<>(meta.baselineNodes());
+
+            missed.remove(meta.consistentId());
+
+            if (missed.isEmpty())
+                break;
+        }
+
+        if (!missed.isEmpty()) {
+            throw new IgniteSnapshotVerifyException(F.asMap(ignite.localNode(),
+                new IgniteException("Some metadata is missing from the snapshot: " + missed)));
+        }
+
+        for (int idx = 0; !allMetas.isEmpty(); idx++) {
+            for (Map.Entry<ClusterNode, List<SnapshotMetadata>> e : clusterMetas.entrySet()) {
+                if (e.getValue().size() < idx)
+                    continue;
+
+                Optional<SnapshotMetadata> meta = e.getValue().stream()
+                    .filter(allMetas::contains)
+                    .findFirst();
+
+                if (meta.isPresent() && allMetas.remove(meta.get())) {
+                    jobs.put(new VisorVerifySnapshotPartitionsJob(meta.get().snapshotName(), meta.get().consistentId()),
+                        e.getKey());
+                }
+
+                if (allMetas.isEmpty())
+                    break;
+            }
+        }
+
+        return jobs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
+        return VerifyBackupPartitionsTaskV2.reduce0(results);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
+        // Handle all exceptions during the `reduce` operation.
+        return ComputeJobResultPolicy.WAIT;
+    }
+
+    /** Job that collects update counters of snapshot partitions on the node it executes. */
+    private static class VisorVerifySnapshotPartitionsJob extends ComputeJobAdapter {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** Injected logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** Snapshot name to validate. */
+        private String snpName;
+
+        /** Consistent snapshot metadata file name. */
+        private String consId;
+
+        /**
+         * @param snpName Snapshot name to validate.
+         * @param consId Consistent snapshot metadata file name.
+         */
+        public VisorVerifySnapshotPartitionsJob(String snpName, String consId) {
+            this.snpName = snpName;
+            this.consId = consId;
+        }
+
+        @Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute() throws IgniteException {
+            IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
+
+            if (log.isInfoEnabled()) {
+                log.info("Verify snapshot partitions procedure has been initiated " +
+                    "[snpName=" + snpName + ", consId=" + consId + ']');
+            }
+
+            SnapshotMetadata meta = snpMgr.readSnapshotMetadata(snpName, consId);
+            Set<Integer> grps = new HashSet<>(meta.partitions().keySet());
+            Set<File> partFiles = new HashSet<>();
+
+            for (File dir : snpMgr.snapshotCacheDirectories(snpName, meta.folderName())) {
+                int grpId = CU.cacheId(cacheGroupName(dir));
+
+                if (!grps.remove(grpId))
+                    continue;
+
+                Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));
+
+                for (File part : cachePartitionFiles(dir)) {
+                    int partId = partId(part.getName());
+
+                    if (!parts.remove(partId))
+                        continue;
+
+                    partFiles.add(part);
+                }
+
+                if (!parts.isEmpty()) {
+                    throw new IgniteException("Snapshot data doesn't contain required cache group partition " +
+                        "[grpId=" + grpId + ", snpName=" + snpName + ", consId=" + consId +
+                        ", missed=" + parts + ", meta=" + meta + ']');
+                }
+            }
+
+            if (!grps.isEmpty()) {
+                throw new IgniteException("Snapshot data doesn't contain required cache groups " +
+                    "[grps=" + grps + ", snpName=" + snpName + ", consId=" + consId +
+                    ", meta=" + meta + ']');
+            }
+
+            Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+            ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize())
+                .order(ByteOrder.nativeOrder()));
+
+            try {
+                U.doInParallel(
+                    ignite.context().getSystemExecutorService(),
+                    partFiles,
+                    part -> {
+                        String grpName = cacheGroupName(part.getParentFile());
+                        int grpId = CU.cacheId(grpName);
+                        int partId = partId(part.getName());
+
+                        FilePageStoreManager storeMgr = (FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+                        try {
+                            try (FilePageStore pageStore = (FilePageStore)storeMgr.getPageStoreFactory(grpId, false)
+                                .createPageStore(getTypeByPartId(partId),
+                                    part::toPath,
+                                    val -> {
+                                    })
+                            ) {
+                                ByteBuffer pageBuff = buff.get();
+                                pageBuff.clear();
+                                pageStore.read(0, pageBuff, true);
+
+                                long pageAddr = GridUnsafe.bufferAddress(pageBuff);
+
+                                PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+                                GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr));
+
+                                if (partState != OWNING)
+                                    throw new IgniteCheckedException("Snapshot partitions must be in OWNING state only: " + partState);

Review comment:
       Fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
##########
@@ -322,9 +275,9 @@ private void printConflicts(Consumer<String> printer) {
 
                 printer.accept("Partition instances: " + entry.getValue() + "\n");
             }
-
-            printer.accept("\n");
         }
+
+        printer.accept("\n");

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r582892243



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/PartitionHashRecordV2.java
##########
@@ -19,7 +19,6 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-

Review comment:
       Fixed.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
##########
@@ -130,7 +131,7 @@
                 .setPageSize(4096))
             .setCacheConfiguration(dfltCacheCfg)
             .setClusterStateOnStart(INACTIVE)
-            .setIncludeEventTypes(EVTS_CLUSTER_SNAPSHOT)
+            .setIncludeEventTypes(EVTS_ALL)

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578566227



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #8715: IGNITE-13725 add snapshot check distributed procedure

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #8715:
URL: https://github.com/apache/ignite/pull/8715#discussion_r578564240



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -738,6 +798,202 @@ public void cancelLocalSnapshotTask(String name) {
         }
     }
 
+    /**
+     * @param name Snapshot name.
+     * @return {@code true} if snapshot is OK.
+     */
+    public IgniteInternalFuture<IdleVerifyResultV2> checkSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        GridKernalContext kctx0 = cctx.kernalContext();
+        GridFutureAdapter<IdleVerifyResultV2> res = new GridFutureAdapter<>();
+
+        kctx0.security().authorize(ADMIN_SNAPSHOT);
+
+        kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+        kctx0.task().execute(SnapshotMetadataCollectorTask.class, name)
+            .listen(f0 -> {
+                if (f0.error() == null) {
+                    kctx0.task().setThreadContext(TC_SKIP_AUTH, true);
+                    kctx0.task().execute(SnapshotPartitionsVerifyTask.class, f0.result())
+                        .listen(f1 -> {
+                            if (f1.error() == null)
+                                res.onDone(f1.result());
+                            else if (f1.error() instanceof IgniteSnapshotVerifyException)
+                                res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f1.error()).exceptions()));
+                            else
+                                res.onDone(f1.error());
+                        });
+                }
+                else {
+                    if (f0.error() instanceof IgniteSnapshotVerifyException)
+                        res.onDone(new IdleVerifyResultV2(((IgniteSnapshotVerifyException)f0.error()).exceptions()));
+                    else
+                        res.onDone(f0.error());
+                }
+            });
+
+        return res;
+    }
+
+    /**
+     * @param part Partition file.
+     * @param grpId Cache group id.
+     * @param partId Partition id.
+     * @param pageBuff Page buffer to read data into.
+     * @param updCntr Partition update counter value consumer.
+     * @param partSize Partition size value consumer.
+     */
+    public void readSnapshotPartitionMeta(
+        File part,
+        int grpId,
+        int partId,
+        ByteBuffer pageBuff,
+        LongConsumer updCntr,
+        LongConsumer partSize
+    ) {
+        try {
+            FilePageStore pageStore = (FilePageStore)storeFactory
+                .apply(grpId, false)
+                .createPageStore(getTypeByPartId(partId),
+                    part::toPath,
+                    val -> {
+                    });
+
+            pageBuff.clear();
+            pageStore.read(0, pageBuff, true);
+
+            PagePartitionMetaIO io = PageIO.getPageIO(pageBuff);
+            GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageBuff));
+
+            assert partState == OWNING : "Snapshot partitions must be in OWNING state only: " + partState;
+
+            long updateCntr = io.getUpdateCounter(pageBuff);
+            long size = io.getSize(pageBuff);
+
+            updCntr.accept(updateCntr);
+            partSize.accept(size);
+
+            if (log.isDebugEnabled()) {
+                log.debug("Partition [grpId=" + grpId
+                    + ", id=" + partId
+                    + ", counter=" + updateCntr
+                    + ", size=" + size + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Consistent id.
+     * @return The list of cache or cache group names in given snapshot on local node.
+     */
+    public List<File> snapshotCacheDirectories(String snpName, String consId) {
+        File snpDir = snapshotLocalDir(snpName);
+
+        if (!snpDir.exists())
+            return Collections.emptyList();
+
+        return cacheDirectories(new File(snpDir, databaseRelativePath(U.maskForFileName(consId))));
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @param consId Node consistent id to read medata for.
+     * @return Snapshot metadata instance.
+     */
+    public SnapshotMetadata readSnapshotMetadata(String snpName, String consId) {
+        return readSnapshotMetadata(new File(snapshotLocalDir(snpName),
+                U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT),
+            marsh,
+            cctx.gridConfig());
+    }
+
+    /**
+     * @param smf File denoting to snapshot metafile.
+     * @return Snapshot metadata instance.
+     */
+    private static SnapshotMetadata readSnapshotMetadata(File smf, Marshaller marsh, IgniteConfiguration cfg) {
+        if (!smf.exists())
+            throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+        String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
+
+        try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
+            SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cfg));
+
+            assert U.maskForFileName(meta.consistentId()).equals(smfName) :
+                "smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId());
+
+            return meta;
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException("An error occurred during reading snapshot metadata file [file=" +
+                smf.getAbsolutePath() + "]", e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return List of snapshot metadata for the given snapshot name on local node.
+     * If snapshot has been taken from local node the snapshot metadata for given
+     * local node will be placed on the first place.
+     */
+    public List<SnapshotMetadata> readSnapshotMetadatas(String snpName) {
+        A.notNullOrEmpty(snpName, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(snpName), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        File[] smfs = snapshotLocalDir(snpName).listFiles((dir, name) ->
+            name.toLowerCase().endsWith(SNAPSHOT_METAFILE_EXT));
+
+        if (smfs == null)
+            throw new IgniteException("Snapshot directory doesn't exists or an I/O error occurred during directory read.");
+
+        Map<String, SnapshotMetadata> metasMap = new HashMap<>();
+        SnapshotMetadata prev = null;
+
+        for (File smf : smfs) {
+            SnapshotMetadata curr = readSnapshotMetadata(smf, marsh, cctx.gridConfig());
+
+            assert prev == null || sameSnapshotMetadata(prev, curr) : "prev=" + prev + ", curr=" + curr;
+
+            metasMap.put(curr.consistentId(), curr);
+
+            prev = curr;
+        }
+
+        SnapshotMetadata currNodeSmf = metasMap.remove(cctx.localNode().consistentId().toString());
+
+        // Snapshot metadata for the local node must be first in the result map.
+        if (currNodeSmf == null)
+            return new ArrayList<>(metasMap.values());
+        else {
+            List<SnapshotMetadata> result = new ArrayList<>();
+
+            result.add(currNodeSmf);
+            result.addAll(metasMap.values());
+
+            return result;
+        }
+    }
+
+    /**
+     * @param meta1 First snapshot metadata.
+     * @param meta2 Second snapshot metadata.
+     * @return {@code true} if given metadata belongs to the same snapshot.
+     */
+    public static boolean sameSnapshotMetadata(SnapshotMetadata meta1, SnapshotMetadata meta2) {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org