You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/04/21 06:44:01 UTC

[GitHub] [ignite] agoncharuk commented on a change in pull request #7607: IGNITE-11073: Create consistent partitions copy on each cluster node

agoncharuk commented on a change in pull request #7607:
URL: https://github.com/apache/ignite/pull/7607#discussion_r411896647



##########
File path: modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.util.List;
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ * This interface provides functionality for creating cluster-wide cache data snapshots.
+ * <p>
+ * Current limitations:
+ * <ul>
+ * <li>Snapshot will trigger PME (partition map exchange) to run itself.</li>
+ * <li>Snapshot will be taken from all registered persistence caches to
+ * grantee data consistency between them.</li>
+ * <li>Snapshot must be resorted manually on the switched off cluster by copying data
+ * to the working directory on each cluster node.</li>
+ * </ul>
+ */
+public interface IgniteSnapshot {
+    /**
+     * @return List of all known snapshots.
+     */
+    public List<String> getSnapshots();

Review comment:
       Should we return a object instead of a String if we add more information about snapshot in future?

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
##########
@@ -0,0 +1,895 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.FullMessage;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_NODE_STOPPING_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Cluster-wide snapshot test.
+ */
+public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
+    /** Time to wait while rebalance may happen. */
+    private static final long REBALANCE_AWAIT_TIME = GridTestUtils.SF.applyLB(10_000, 3_000);
+
+    /** Cache configuration for test. */
+    private static CacheConfiguration<Integer, Integer> atomicCcfg = new CacheConfiguration<Integer, Integer>("atomicCacheName")
+        .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+        .setBackups(2)
+        .setAffinity(new RendezvousAffinityFunction(false)
+            .setPartitions(CACHE_PARTS_COUNT));
+
+    /** {@code true} if node should be started in separate jvm. */
+    protected volatile boolean jvm;
+
+    /** @throws Exception If fails. */
+    @Before
+    @Override public void beforeTestSnapshot() throws Exception {
+        super.beforeTestSnapshot();
+
+        jvm = false;
+    }
+
+    /**
+     * Take snapshot from the whole cluster and check snapshot consistency when the
+     * cluster tx load starts on a new topology version.
+     * Note: Client nodes and server nodes not in baseline topology must not be affected.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testConsistentClusterSnapshotLoadNewTopology() throws Exception {
+        int grids = 3;
+        String snpName = "backup23012020";
+        AtomicInteger atKey = new AtomicInteger(CACHE_KEYS_RANGE);
+        AtomicInteger txKey = new AtomicInteger(CACHE_KEYS_RANGE);
+
+        IgniteEx ignite = startGrids(grids);
+        startClientGrid();
+
+        ignite.cluster().baselineAutoAdjustEnabled(false);
+        ignite.cluster().state(ACTIVE);
+
+        // Start node not in baseline.
+        IgniteEx notBltIgnite = startGrid(grids);
+        File locSnpDir = snp(notBltIgnite).snapshotLocalDir(SNAPSHOT_NAME);
+        String notBltDirName = folderName(notBltIgnite);
+
+        IgniteCache<Integer, Integer> atCache = ignite.createCache(atomicCcfg);
+
+        for (int idx = 0; idx < CACHE_KEYS_RANGE; idx++) {
+            atCache.put(atKey.incrementAndGet(), -1);
+            ignite.cache(DEFAULT_CACHE_NAME).put(txKey.incrementAndGet(), -1);
+        }
+
+        forceCheckpoint();
+
+        CountDownLatch loadLatch = new CountDownLatch(1);
+
+        ignite.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
+            /** {@inheritDoc} */
+            @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                // First discovery custom event will be a snapshot operation.
+                assertTrue(isSnapshotOperation(fut.firstEvent()));
+                assertTrue("Snapshot must use pme-free exchange", fut.context().exchangeFreeSwitch());
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)fut.firstEvent()).customMessage();
+
+                assertNotNull(msg);
+
+                if (msg instanceof SnapshotDiscoveryMessage)
+                    loadLatch.countDown();
+            }
+        });
+
+        // Start cache load.
+        IgniteInternalFuture<Long> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                U.await(loadLatch);
+
+                while (!Thread.currentThread().isInterrupted()) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int atIdx = rnd.nextInt(grids);
+
+                    // Zero out the sign bit.
+                    grid(atIdx).cache(atomicCcfg.getName()).put(txKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+
+                    int txIdx = rnd.nextInt(grids);
+
+                    grid(txIdx).cache(DEFAULT_CACHE_NAME).put(atKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new RuntimeException(e);
+            }
+        }, 3, "cache-put-");
+
+        try {
+            IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(snpName);
+
+            U.await(loadLatch, 10, TimeUnit.SECONDS);
+
+            fut.get();
+        }
+        finally {
+            loadFut.cancel();
+        }
+
+        // Cluster can be deactivated but we must test snapshot restore when binary recovery also occurred.
+        stopAllGrids();
+
+        assertTrue("Snapshot directory must be empty for node not in baseline topology: " + notBltDirName,
+            !searchDirectoryRecursively(locSnpDir.toPath(), notBltDirName).isPresent());
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, snpName);
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + DEFAULT_CACHE_NAME,
+            CACHE_KEYS_RANGE, snpIg0.cache(DEFAULT_CACHE_NAME).size());
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + atomicCcfg.getName(),
+            CACHE_KEYS_RANGE, snpIg0.cache(atomicCcfg.getName()).size());
+
+        snpIg0.cache(DEFAULT_CACHE_NAME).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + DEFAULT_CACHE_NAME + ", entry=" + e +']', (Integer)e.getValue() < 0));
+
+        snpIg0.cache(atomicCcfg.getName()).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + atomicCcfg.getName() + ", entry=" + e + ']', (Integer)e.getValue() < 0));
+    }
+

Review comment:
       We need to add tests that check that SQL indexes are successfully restored and used, including statically configured indexes and dynamically created indexes.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
##########
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.store.PageWriteListener;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
+
+/**
+ *
+ */
+class SnapshotFutureTask extends GridFutureAdapter<Boolean> implements DbCheckpointListener {
+    /** Shared context. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Node id which cause snapshot operation. */
+    private final UUID srcNodeId;
+
+    /** Unique identifier of snapshot process. */
+    private final String snpName;
+
+    /** Snapshot working directory on file system. */
+    private final File tmpSnpWorkDir;
+
+    /** Local buffer to perform copy-on-write operations for {@link PageStoreSerialWriter}. */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
+    /** IO factory which will be used for creating snapshot delta-writers. */
+    private final FileIOFactory ioFactory;
+
+    /**
+     * The length of file size per each cache partition file.
+     * Partition has value greater than zero only for partitions in OWNING state.
+     * Information collected under checkpoint write lock.
+     */
+    private final Map<GroupPartitionId, Long> partFileLengths = new HashMap<>();
+
+    /**
+     * Map of partitions to snapshot and theirs corresponding delta PageStores.
+     * Writers are pinned to the snapshot context due to controlling partition
+     * processing supplier.
+     */
+    private final Map<GroupPartitionId, PageStoreSerialWriter> partDeltaWriters = new HashMap<>();
+
+    /** Snapshot data sender. */
+    @GridToStringExclude
+    private final SnapshotSender snpSndr;
+
+    /**
+     * Requested map of cache groups and its partitions to include into snapshot. If array of partitions
+     * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+     * In this case if all of partitions have OWNING state the index partition also will be included.
+     * <p>
+     * If partitions for particular cache group are not provided that they will be collected and added
+     * on checkpoint under the write lock.
+     */
+    private final Map<Integer, Set<Integer>> parts;
+
+    /** Cache group and corresponding partitions collected under the checkpoint write lock. */
+    private final Map<Integer, Set<Integer>> processed = new HashMap<>();
+
+    /** Checkpoint end future. */
+    private final CompletableFuture<Boolean> cpEndFut = new CompletableFuture<>();
+
+    /** Future to wait until checkpoint mark phase will be finished and snapshot tasks scheduled. */
+    private final GridFutureAdapter<Void> startedFut = new GridFutureAdapter<>();
+
+    /** Absolute path to save intermediate results of cache partitions of this node. */
+    private volatile File tmpConsIdDir;
+
+    /** Future which will be completed when task requested to be closed. Will be executed on system pool. */
+    private volatile CompletableFuture<Void> closeFut;
+
+    /** An exception which has been occurred during snapshot processing. */
+    private final AtomicReference<Throwable> err = new AtomicReference<>();
+
+    /** Flag indicates that task already scheduled on checkpoint. */
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    /**
+     * @param e Finished snapshot task future with particular exception.
+     */
+    public SnapshotFutureTask(IgniteCheckedException e) {
+        A.notNull(e, "Exception for a finished snapshot task must be not null");
+
+        cctx = null;
+        log = null;
+        snpName = null;
+        srcNodeId = null;
+        tmpSnpWorkDir = null;
+        snpSndr = null;
+
+        err.set(e);
+        startedFut.onDone(e);
+        onDone(e);
+        parts = null;
+        ioFactory = null;
+        locBuff = null;
+    }
+
+    /**
+     * @param snpName Unique identifier of snapshot task.
+     * @param ioFactory Factory to working with delta as file storage.
+     * @param parts Map of cache groups and its partitions to include into snapshot, if set of partitions
+     * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+     */
+    public SnapshotFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts,
+        ThreadLocal<ByteBuffer> locBuff
+    ) {
+        A.notNull(snpName, "Snapshot name cannot be empty or null");
+        A.notNull(snpSndr, "Snapshot sender which handles execution tasks must be not null");
+        A.notNull(snpSndr.executor(), "Executor service must be not null");
+
+        this.parts = parts;
+        this.cctx = cctx;
+        this.log = cctx.logger(SnapshotFutureTask.class);
+        this.snpName = snpName;
+        this.srcNodeId = srcNodeId;
+        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+        this.snpSndr = snpSndr;
+        this.ioFactory = ioFactory;
+        this.locBuff = locBuff;
+    }
+
+    /**
+     * @return Snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /**
+     * @return Node id which triggers this operation.
+     */
+    public UUID sourceNodeId() {
+        return srcNodeId;
+    }
+
+    /**
+     * @return Type of snapshot operation.
+     */
+    public Class<? extends SnapshotSender> type() {
+        return snpSndr.getClass();
+    }
+
+    /**
+     * @return Set of cache groups included into snapshot operation.
+     */
+    public Set<Integer> affectedCacheGroups() {
+        return parts.keySet();
+    }
+
+    /**
+     * @param th An exception which occurred during snapshot processing.
+     */
+    public void acceptException(Throwable th) {
+        if (th == null)
+            return;
+
+        if (err.compareAndSet(null, th))
+            closeAsync();
+
+        startedFut.onDone(th);
+
+        U.warn(log, "Snapshot task has accepted exception to stop: " + th);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+        for (PageStoreSerialWriter writer : partDeltaWriters.values())
+            U.closeQuiet(writer);
+
+        snpSndr.close(err);
+
+        if (tmpConsIdDir != null)
+            U.delete(tmpConsIdDir);
+
+        // Delete snapshot directory if no other files exists.
+        try {
+            if (U.fileCount(tmpSnpWorkDir.toPath()) == 0 || err != null)
+                U.delete(tmpSnpWorkDir.toPath());
+        }
+        catch (IOException e) {
+            log.error("Snapshot directory doesn't exist [snpName=" + snpName + ", dir=" + tmpSnpWorkDir + ']');
+        }
+
+        if (err != null)
+            startedFut.onDone(err);
+
+        return super.onDone(res, err);
+    }
+
+    /**
+     * @throws IgniteCheckedException If fails.
+     */
+    public void awaitStarted() throws IgniteCheckedException {
+        startedFut.get();
+    }
+
+    /**
+     * @return {@code true} if current task requested to be stopped.
+     */
+    private boolean stopping() {
+        return err.get() != null;
+    }
+
+    /**
+     * Initiates snapshot task.
+     *
+     * @return {@code true} if task started by this call.
+     */
+    public boolean start() {
+        if (stopping())
+            return false;
+
+        try {
+            if (!started.compareAndSet(false, true))
+                return false;
+
+            tmpConsIdDir = U.resolveWorkDirectory(tmpSnpWorkDir.getAbsolutePath(),
+                databaseRelativePath(cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName()),
+                false);
+
+            for (Integer grpId : parts.keySet()) {
+                CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group context not found: " + grpId);
+
+                if (!CU.isPersistentCache(gctx.config(), cctx.kernalContext().config().getDataStorageConfiguration()))
+                    throw new IgniteCheckedException("In-memory cache groups are not allowed to be snapshot: " + grpId);
+
+                if (gctx.config().isEncryptionEnabled())
+                    throw new IgniteCheckedException("Encrypted cache groups are not allowed to be snapshot: " + grpId);
+
+                // Create cache group snapshot directory on start in a single thread.
+                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())),
+                    "directory for snapshotting cache group",
+                    log);
+            }
+
+            startedFut.listen(f ->
+                ((GridCacheDatabaseSharedManager)cctx.database()).removeCheckpointListener(this)
+            );
+
+            // Listener will be removed right after first execution.
+            ((GridCacheDatabaseSharedManager)cctx.database()).addCheckpointListener(this);
+
+            if (log.isInfoEnabled()) {
+                log.info("Snapshot operation is scheduled on local node and will be handled by the checkpoint " +
+                    "listener [sctx=" + this + ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
+            }
+        }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        if (stopping())
+            return;
+
+        ctx.finishedStateFut().listen(f -> {
+            if (f.error() == null)
+                cpEndFut.complete(true);
+            else
+                cpEndFut.completeExceptionally(f.error());
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        // Write lock is hold. Partition pages counters has been collected under write lock.
+        if (stopping())
+            return;
+
+        try {
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+                int grpId = e.getKey();
+                Set<Integer> grpParts = e.getValue();
+
+                GridDhtPartitionTopology top = cctx.cache().cacheGroup(grpId).topology();
+
+                Iterator<GridDhtLocalPartition> iter;
+
+                if (grpParts == null)
+                    iter = top.currentLocalPartitions().iterator();
+                else {
+                    if (grpParts.contains(INDEX_PARTITION)) {
+                        throw new IgniteCheckedException("Index partition cannot be included into snapshot if " +
+                            " set of cache group partitions has been explicitly provided [grpId=" + grpId + ']');
+                    }
+
+                    iter = F.iterator(grpParts, top::localPartition, false);
+                }
+
+                Set<Integer> owning = new HashSet<>();
+                Set<Integer> missed = new HashSet<>();
+
+                // Iterate over partitions in particular cache group.
+                while (iter.hasNext()) {
+                    GridDhtLocalPartition part = iter.next();
+
+                    // Partition can be in MOVING\RENTING states.
+                    // Index partition will be excluded if not all partition OWNING.
+                    // There is no data assigned to partition, thus it haven't been created yet.
+                    if (part.state() == GridDhtPartitionState.OWNING)
+                        owning.add(part.id());
+                    else
+                        missed.add(part.id());
+                }
+
+                if (grpParts != null) {
+                    // Partition has been provided for cache group, but some of them are not in OWNING state.
+                    // Exit with an error.
+                    if (!missed.isEmpty()) {
+                        throw new IgniteCheckedException("Snapshot operation cancelled due to " +
+                            "not all of requested partitions has OWNING state on local node [grpId=" + grpId +
+                            ", missed" + missed + ']');
+                    }
+                }
+                else {
+                    // Partitions has not been provided for snapshot task and all partitions have
+                    // OWNING state, so index partition must be included into snapshot.
+                    if (!missed.isEmpty()) {
+                        log.warning("All local cache group partitions in OWNING state have been included into a snapshot. " +
+                            "Partitions which have different states skipped. Index partitions has also been skipped " +
+                            "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + missed + ']');
+                    }
+                    else if (missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
+                        owning.add(INDEX_PARTITION);
+                }
+
+                processed.put(grpId, owning);
+            }
+
+            for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+                int grpId = e.getKey();
+
+                CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+                if (gctx == null) {
+                    throw new IgniteCheckedException("Cache group context has not found " +
+                        "due to the cache group is stopped: " + grpId);
+                }
+
+                for (int partId : e.getValue()) {
+                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
+
+                    PageStore store = ((FilePageStoreManager)cctx.pageStore()).getStore(grpId, partId);
+
+                    partDeltaWriters.put(pair,
+                        new PageStoreSerialWriter(store,
+                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+
+                    partFileLengths.put(pair, store.size());
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) {
+        if (stopping())
+            return;
+
+        assert !processed.isEmpty() : "Partitions to process must be collected under checkpoint mark phase";
+
+        wrapExceptionIfStarted(() -> snpSndr.init(processed.values().stream().mapToInt(Set::size).sum()))
+            .run();
+
+        // Snapshot task can now be started since checkpoint write lock released and
+        // there is no error happen on task init.
+        if (!startedFut.onDone())
+            return;
+
+        // Submit all tasks for partitions and deltas processing.
+        List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+        if (log.isInfoEnabled())
+            log.info("Submit partition processing tasks with partition allocated lengths: " + partFileLengths);
+
+        Collection<BinaryType> binTypesCopy = cctx.kernalContext()
+            .cacheObjects()
+            .metadata(Collections.emptyList())
+            .values();
+
+        // Process binary meta.
+        futs.add(CompletableFuture.runAsync(
+            wrapExceptionIfStarted(() -> snpSndr.sendBinaryMeta(binTypesCopy)),
+            snpSndr.executor()));
+
+        List<Map<Integer, MappedName>> mappingsCopy = cctx.kernalContext()
+            .marshallerContext()
+            .getCachedMappings();
+
+        // Process marshaller meta.
+        futs.add(CompletableFuture.runAsync(
+            wrapExceptionIfStarted(() -> snpSndr.sendMarshallerMeta(mappingsCopy)),
+            snpSndr.executor()));
+
+        FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+        for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+            int grpId = e.getKey();
+
+            CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+            if (gctx == null) {
+                acceptException(new IgniteCheckedException("Cache group context has not found " +
+                    "due to the cache group is stopped: " + grpId));
+
+                break;
+            }
+
+            // Process the cache group configuration files.
+            futs.add(CompletableFuture.runAsync(
+                wrapExceptionIfStarted(() -> {
+                    List<File> ccfgs = storeMgr.configurationFiles(gctx.config());
+
+                    if (ccfgs == null)
+                        return;
+
+                    for (File ccfg0 : ccfgs)
+                        snpSndr.sendCacheConfig(ccfg0, cacheDirName(gctx.config()));
+                }),
+                snpSndr.executor())

Review comment:
       Looks like there is a race here: we read the configuration file from snapshot executor thread while the files can be modified by SQL create/drop column commands, for example.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
##########
@@ -0,0 +1,895 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.FullMessage;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_NODE_STOPPING_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Cluster-wide snapshot test.
+ */
+public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
+    /** Time to wait while rebalance may happen. */
+    private static final long REBALANCE_AWAIT_TIME = GridTestUtils.SF.applyLB(10_000, 3_000);
+
+    /** Cache configuration for test. */
+    private static CacheConfiguration<Integer, Integer> atomicCcfg = new CacheConfiguration<Integer, Integer>("atomicCacheName")
+        .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+        .setBackups(2)
+        .setAffinity(new RendezvousAffinityFunction(false)
+            .setPartitions(CACHE_PARTS_COUNT));
+
+    /** {@code true} if node should be started in separate jvm. */
+    protected volatile boolean jvm;
+
+    /** @throws Exception If fails. */
+    @Before
+    @Override public void beforeTestSnapshot() throws Exception {
+        super.beforeTestSnapshot();
+
+        jvm = false;
+    }
+
+    /**
+     * Take snapshot from the whole cluster and check snapshot consistency when the
+     * cluster tx load starts on a new topology version.
+     * Note: Client nodes and server nodes not in baseline topology must not be affected.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testConsistentClusterSnapshotLoadNewTopology() throws Exception {
+        int grids = 3;
+        String snpName = "backup23012020";
+        AtomicInteger atKey = new AtomicInteger(CACHE_KEYS_RANGE);
+        AtomicInteger txKey = new AtomicInteger(CACHE_KEYS_RANGE);
+
+        IgniteEx ignite = startGrids(grids);
+        startClientGrid();
+
+        ignite.cluster().baselineAutoAdjustEnabled(false);
+        ignite.cluster().state(ACTIVE);
+
+        // Start node not in baseline.
+        IgniteEx notBltIgnite = startGrid(grids);
+        File locSnpDir = snp(notBltIgnite).snapshotLocalDir(SNAPSHOT_NAME);
+        String notBltDirName = folderName(notBltIgnite);
+
+        IgniteCache<Integer, Integer> atCache = ignite.createCache(atomicCcfg);
+
+        for (int idx = 0; idx < CACHE_KEYS_RANGE; idx++) {
+            atCache.put(atKey.incrementAndGet(), -1);
+            ignite.cache(DEFAULT_CACHE_NAME).put(txKey.incrementAndGet(), -1);
+        }
+
+        forceCheckpoint();
+
+        CountDownLatch loadLatch = new CountDownLatch(1);
+
+        ignite.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
+            /** {@inheritDoc} */
+            @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                // First discovery custom event will be a snapshot operation.
+                assertTrue(isSnapshotOperation(fut.firstEvent()));
+                assertTrue("Snapshot must use pme-free exchange", fut.context().exchangeFreeSwitch());
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)fut.firstEvent()).customMessage();
+
+                assertNotNull(msg);
+
+                if (msg instanceof SnapshotDiscoveryMessage)
+                    loadLatch.countDown();
+            }
+        });
+
+        // Start cache load.
+        IgniteInternalFuture<Long> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                U.await(loadLatch);
+
+                while (!Thread.currentThread().isInterrupted()) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int atIdx = rnd.nextInt(grids);
+
+                    // Zero out the sign bit.
+                    grid(atIdx).cache(atomicCcfg.getName()).put(txKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+
+                    int txIdx = rnd.nextInt(grids);
+
+                    grid(txIdx).cache(DEFAULT_CACHE_NAME).put(atKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new RuntimeException(e);
+            }
+        }, 3, "cache-put-");
+
+        try {
+            IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(snpName);
+
+            U.await(loadLatch, 10, TimeUnit.SECONDS);
+
+            fut.get();
+        }
+        finally {
+            loadFut.cancel();
+        }
+
+        // Cluster can be deactivated but we must test snapshot restore when binary recovery also occurred.
+        stopAllGrids();
+
+        assertTrue("Snapshot directory must be empty for node not in baseline topology: " + notBltDirName,
+            !searchDirectoryRecursively(locSnpDir.toPath(), notBltDirName).isPresent());
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, snpName);
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + DEFAULT_CACHE_NAME,
+            CACHE_KEYS_RANGE, snpIg0.cache(DEFAULT_CACHE_NAME).size());
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + atomicCcfg.getName(),
+            CACHE_KEYS_RANGE, snpIg0.cache(atomicCcfg.getName()).size());
+
+        snpIg0.cache(DEFAULT_CACHE_NAME).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + DEFAULT_CACHE_NAME + ", entry=" + e +']', (Integer)e.getValue() < 0));
+
+        snpIg0.cache(atomicCcfg.getName()).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + atomicCcfg.getName() + ", entry=" + e + ']', (Integer)e.getValue() < 0));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotPrimaryBackupsTheSame() throws Exception {
+        int grids = 3;
+        AtomicInteger cacheKey = new AtomicInteger();
+
+        IgniteEx ignite = startGridsWithCache(grids, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<Long> atLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int gId = rnd.nextInt(grids);
+
+                IgniteCache<Integer, Integer> txCache = grid(gId).getOrCreateCache(dfltCacheCfg.getName());
+
+                try (Transaction tx = grid(gId).transactions().txStart()) {
+                    txCache.put(cacheKey.incrementAndGet(), 0);
+
+                    txCache.put(cacheKey.incrementAndGet(), 1);
+
+                    tx.commit();
+                }
+            }
+        }, 5, "tx-cache-put-");
+
+        IgniteInternalFuture<Long> txLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                IgniteCache<Integer, Integer> atomicCache = grid(rnd.nextInt(grids))
+                    .getOrCreateCache(atomicCcfg);
+
+                atomicCache.put(cacheKey.incrementAndGet(), 0);
+            }
+        }, 5, "atomic-cache-put-");
+
+        try {
+            IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+            fut.get();
+        }
+        finally {
+            txLoadFut.cancel();
+            atLoadFut.cancel();
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Block whole rebalancing.
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).blockMessages((node, msg) -> msg instanceof GridDhtPartitionDemandMessage);
+
+        snpIg0.cluster().state(ACTIVE);
+
+        assertFalse("Primary and backup in snapshot must have the same counters. Rebalance must not happen.",
+            GridTestUtils.waitForCondition(() -> {
+                boolean hasMsgs = false;
+
+                for (Ignite g : G.allGrids())
+                    hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+                return hasMsgs;
+            }, REBALANCE_AWAIT_TIME));
+
+        TestRecordingCommunicationSpi.stopBlockAll();
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotConsistencyUnderLoad() throws Exception {
+        int clientsCnt = 50;
+        int balance = 10_000;
+        int transferLimit = 1000;
+        int total = clientsCnt * balance * 2;
+        int grids = 3;
+        int transferThreadCnt = 4;
+        AtomicBoolean stop = new AtomicBoolean(false);
+        CountDownLatch txStarted = new CountDownLatch(1);
+
+        CacheConfiguration<Integer, Account> eastCcfg = txCacheConfig(new CacheConfiguration<>("east"));
+        CacheConfiguration<Integer, Account> westCcfg = txCacheConfig(new CacheConfiguration<>("west"));
+
+        for (int i = 0; i < grids; i++)
+            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration(eastCcfg, westCcfg)));
+
+        grid(0).cluster().state(ACTIVE);
+
+        Ignite client = startClientGrid(grids);
+
+        IgniteCache<Integer, Account> eastCache = client.cache(eastCcfg.getName());
+        IgniteCache<Integer, Account> westCache = client.cache(westCcfg.getName());
+
+        // Create clients with initial balance.
+        for (int i = 0; i < clientsCnt; i++) {
+            eastCache.put(i, new Account(i, balance));
+            westCache.put(i, new Account(i, balance));
+        }
+
+        assertEquals("The initial summary value in all caches is not correct.",
+            total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+        forceCheckpoint();
+
+        IgniteInternalFuture<?> txLoadFut = GridTestUtils.runMultiThreadedAsync(
+            () -> {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int amount;
+
+                try {
+                    while (!stop.get()) {
+                        IgniteEx ignite = grid(rnd.nextInt(grids));
+                        IgniteCache<Integer, Account> east = ignite.cache("east");
+                        IgniteCache<Integer, Account> west = ignite.cache("west");
+
+                        amount = rnd.nextInt(transferLimit);
+
+                        txStarted.countDown();
+
+                        try (Transaction tx = ignite.transactions().txStart()) {
+                            Integer id = rnd.nextInt(clientsCnt);
+
+                            Account acc0 = east.get(id);
+                            Account acc1 = west.get(id);
+
+                            acc0.balance -= amount;
+                            acc1.balance += amount;
+
+                            east.put(id, acc0);
+                            west.put(id, acc1);
+
+                            tx.commit();
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    U.error(log, e);
+
+                    fail("Tx must not be failed.");
+                }
+            }, transferThreadCnt, "transfer-account-thread-");
+
+        try {
+            U.await(txStarted);
+
+            grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+        }
+        finally {
+            stop.set(true);
+        }
+
+        txLoadFut.get();
+
+        assertEquals("The summary value should not changed during tx transfers.",
+            total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+        stopAllGrids();
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, SNAPSHOT_NAME);
+
+        assertEquals("The total amount of all cache values must not changed in snapshot.",
+            total, sumAllCacheValues(snpIg0, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithCacheNodeFilter() throws Exception {
+        int grids = 4;
+
+        CacheConfiguration<Integer, Integer> ccfg = txCacheConfig(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+            .setNodeFilter(node -> node.consistentId().toString().endsWith("1"));
+
+        for (int i = 0; i < grids; i++)
+            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration()));
+
+        IgniteEx ig0 = grid(0);
+
+        ig0.cluster().baselineAutoAdjustEnabled(false);
+        ig0.cluster().state(ACTIVE);
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            ig0.getOrCreateCache(ccfg).put(i, i);
+
+        ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(grids,
+            cfg -> resolveSnapshotWorkDirectory(cfg.setCacheConfiguration()).getAbsolutePath(),
+            SNAPSHOT_NAME,
+            true);
+
+        awaitPartitionMapExchange();
+
+        assertSnapshotCacheKeys(snp.cache(ccfg.getName()));

Review comment:
       Additionally check that node filter is preserved.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -0,0 +1,1943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.ignite.cluster.ClusterState.active;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
+import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
+import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId;
+import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/**
+ * Internal implementation of snapshot operations over persistence caches.
+ * <p>
+ * There are two major actions available:
+ * <ul>
+ *     <li>Create snapshot of the whole cluster cache groups by triggering PME to achieve consistency.</li>
+ *     <li>Create local snapshot of requested cache groups and send it to the node which request this operation.
+ *     Cache groups will be transmitted using internal API for transferring files. See {@link TransmissionHandler}.</li>
+ * </ul>
+ */
+public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
+    implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener {
+    /** File with delta pages suffix. */
+    public static final String DELTA_SUFFIX = ".delta";
+
+    /** File name template consists of delta pages. */
+    public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + DELTA_SUFFIX;
+
+    /** File name template for index delta pages. */
+    public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + DELTA_SUFFIX;
+
+    /** Text Reason for checkpoint to start snapshot operation. */
+    public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
+
+    /** Name prefix for each remote snapshot operation. */
+    public static final String RMT_SNAPSHOT_PREFIX = "snapshot_";
+
+    /** Default snapshot directory for loading remote snapshots. */
+    public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
+
+    /** Timeout in millisecond for snapshot operations. */
+    public static final long DFLT_SNAPSHOT_TIMEOUT = 15_000L;
+
+    /** Snapshot in progress error message. */
+    public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
+
+    /** Error message to finalize snapshot tasks. */
+    public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
+        "is stopping";
+
+    /** Metastorage key to save currently running snapshot. */
+    public static final String SNP_RUNNING_KEY = "snapshot-running";
+
+    /** Snapshot metrics prefix. */
+    public static final String SNAPSHOT_METRICS = "snapshot";
+
+    /** Prefix for snapshot threads. */
+    private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner";
+
+    /** Total number of thread to perform local snapshot. */
+    private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
+
+    /** Default snapshot topic to receive snapshots from remote node. */
+    private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp");
+
+    /** File transmission parameter of cache group id. */
+    private static final String SNP_GRP_ID_PARAM = "grpId";
+
+    /** File transmission parameter of cache partition id. */
+    private static final String SNP_PART_ID_PARAM = "partId";
+
+    /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
+    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+    /** File transmission parameter of a cache directory with is currently sends its partitions. */
+    private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+    /** Snapshot parameter name for a file transmission. */
+    private static final String SNP_NAME_PARAM = "snpName";
+
+    /** Total snapshot files count which receiver should expect to receive. */
+    private static final String SNP_PARTITIONS_CNT = "partsCnt";
+
+    /**
+     * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
+     * It is important to have only only buffer per thread (instead of creating each buffer per
+     * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is redundant and can lead to OOM errors. Direct buffer
+     * deallocate only when ByteBuffer is garbage collected, but it can get out of off-heap memory before it.
+     */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
+    /** Map of registered cache snapshot processes and their corresponding contexts. */
+    private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+
+    /** Lock to protect the resources is used. */
+    private final GridBusyLock busyLock = new GridBusyLock();
+
+    /** Requested snapshot from remote node. */
+    private final AtomicReference<RemoteSnapshotFuture> rmtSnpReq = new AtomicReference<>();
+
+    /** Mutex used to order cluster snapshot operation progress. */
+    private final Object snpOpMux = new Object();
+
+    /** Take snapshot operation procedure. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;
+
+    /** Check previously performed snapshot operation and delete uncompleted files if need. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;
+
+    /** Resolved persistent data storage settings. */
+    private volatile PdsFolderSettings pdsSettings;
+
+    /** Fully initialized metastorage. */
+    private volatile ReadWriteMetastorage metaStorage;
+
+    /** Local snapshot sender factory. */
+    private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
+
+    /** Main snapshot directory to save created snapshots. */
+    private volatile File locSnpDir;
+
+    /**
+     * Working directory for loaded snapshots from the remote nodes and storing
+     * temporary partition delta-files of locally started snapshot process.
+     */
+    private File tmpWorkDir;
+
+    /** Factory to working with delta as file storage. */
+    private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+    /** Factory to create page store for restore. */
+    private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> storeFactory;
+
+    /** Snapshot thread pool to perform local partition snapshots. */
+    private ExecutorService snpRunner;
+
+    /** System discovery message listener. */
+    private DiscoveryEventListener discoLsnr;
+
+    /** Cluster snapshot operation requested by user. */
+    private ClusterSnapshotFuture clusterSnpFut;
+
+    /** Current snapshot operation on local node. */
+    private volatile SnapshotOperationRequest clusterSnpReq;
+
+    /** {@code true} if recovery process occurred for snapshot. */
+    private volatile boolean recovered;
+
+    /** Last seen cluster snapshot operation. */
+    private volatile ClusterSnapshotFuture lastSeenSnpFut = new ClusterSnapshotFuture();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteSnapshotManager(GridKernalContext ctx) {
+        locBuff = ThreadLocal.withInitial(() ->
+            ByteBuffer.allocateDirect(ctx.config().getDataStorageConfiguration().getPageSize())
+                .order(ByteOrder.nativeOrder()));
+
+        startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::initLocalSnapshotStartStage,
+            this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new);
+
+        endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::initLocalSnapshotEndStage,
+            this::processLocalSnapshotEndStageResult);
+    }
+
+    /**
+     * @param snapshotCacheDir Snapshot directory to store files.
+     * @param partId Cache partition identifier.
+     * @return A file representation.
+     */
+    public static File partDeltaFile(File snapshotCacheDir, int partId) {
+        return new File(snapshotCacheDir, partDeltaFileName(partId));
+    }
+
+    /**
+     * @param partId Partition id.
+     * @return File name of delta partition pages.
+     */
+    public static String partDeltaFileName(int partId) {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+        return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : String.format(PART_DELTA_TEMPLATE, partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        super.start0();
+
+        GridKernalContext ctx = cctx.kernalContext();
+
+        if (ctx.clientNode())
+            return;
+
+        if (!CU.isPersistenceEnabled(ctx.config()))
+            return;
+
+        snpRunner = new IgniteThreadPoolExecutor(SNAPSHOT_RUNNER_THREAD_PREFIX,
+            cctx.igniteInstanceName(),
+            SNAPSHOT_THREAD_POOL_SIZE,
+            SNAPSHOT_THREAD_POOL_SIZE,
+            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            SYSTEM_POOL,
+            new OomExceptionHandler(ctx));
+
+        assert cctx.pageStore() instanceof FilePageStoreManager;
+
+        FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+        pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
+
+        locSnpDir = resolveSnapshotWorkDirectory(ctx.config());
+        tmpWorkDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR, true);
+
+        U.ensureDirectory(locSnpDir, "snapshot work directory", log);
+        U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);
+
+        MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);
+
+        mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
+            "The system time of the last cluster snapshot request start time on this node.");
+        mreg.register("LastSnapshotEndTime", () -> lastSeenSnpFut.endTime,
+            "The system time of the last cluster snapshot request end time on this node.");
+        mreg.register("LastSnapshotName", () -> lastSeenSnpFut.name, String.class,
+            "The name of last started cluster snapshot request on this node.");
+        mreg.register("LastSnapshotErrorMessage",
+            () -> lastSeenSnpFut.error() == null ? "" : lastSeenSnpFut.error().getMessage(),
+            String.class,
+            "The error message of last started cluster snapshot request which fail with an error. " +
+                "This value will be empty if last snapshot request has been completed successfully.");
+        mreg.register("LocalSnapshotList", this::getSnapshots, List.class,
+            "The list of names of all snapshots currently saved on the local node with respect to " +
+                "the configured via IgniteConfiguration snapshot working path.");
+
+        storeFactory = storeMgr::getPageStoreFactory;
+
+        cctx.exchange().registerExchangeAwareComponent(this);
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+
+        // Receive remote snapshots requests.
+        cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+                if (!busyLock.enterBusy())
+                    return;
+
+                try {
+                    if (msg instanceof SnapshotRequestMessage) {
+                        SnapshotRequestMessage reqMsg0 = (SnapshotRequestMessage)msg;
+                        String snpName = reqMsg0.snapshotName();
+
+                        synchronized (this) {
+                            SnapshotFutureTask task = lastScheduledRemoteSnapshotTask(nodeId);
+
+                            if (task != null) {
+                                // Task will also be removed from local map due to the listener on future done.
+                                task.cancel();
+
+                                log.info("Snapshot request has been cancelled due to another request received " +
+                                    "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                            }
+                        }
+
+                        SnapshotFutureTask task = registerSnapshotTask(snpName,
+                            nodeId,
+                            reqMsg0.parts(),
+                            remoteSnapshotSender(snpName, nodeId));
+
+                        task.listen(f -> {
+                            if (f.error() == null)
+                                return;
+
+                            U.error(log, "Failed to process request of creating a snapshot " +
+                                "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                            try {
+                                cctx.gridIO().sendToCustomTopic(nodeId,
+                                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                    new SnapshotResponseMessage(reqMsg0.snapshotName(), f.error().getMessage()),
+                                    SYSTEM_POOL);
+                            }
+                            catch (IgniteCheckedException ex0) {
+                                U.error(log, "Fail to send the response message with processing snapshot request " +
+                                    "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                            }
+                        });
+
+                        task.start();
+                    }
+                    else if (msg instanceof SnapshotResponseMessage) {
+                        SnapshotResponseMessage respMsg0 = (SnapshotResponseMessage)msg;
+
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || !fut0.snpName.equals(respMsg0.snapshotName())) {
+                            if (log.isInfoEnabled()) {
+                                log.info("A stale snapshot response message has been received. Will be ignored " +
+                                    "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                            }
+
+                            return;
+                        }
+
+                        if (respMsg0.errorMessage() != null) {
+                            fut0.onDone(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                                "on the remote node with an error: " + respMsg0.errorMessage()));
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                    cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+
+        cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                UUID leftNodeId = evt.eventNode().id();
+
+                if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
+                    SnapshotOperationRequest snpReq = clusterSnpReq;
+
+                    for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+                        if (sctx.sourceNodeId().equals(leftNodeId) ||
+                            (snpReq != null &&
+                                snpReq.snpName.equals(sctx.snapshotName()) &&
+                                snpReq.bltNodes.contains(leftNodeId))) {
+                            sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " +
+                                "One of baseline nodes left the cluster: " + leftNodeId));
+                        }
+                    }
+
+                    RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                    if (snpTrFut != null && snpTrFut.rmtNodeId.equals(leftNodeId)) {
+                        snpTrFut.onDone(new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                            "requested left the grid"));
+                    }
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        // Remote snapshot handler.
+        cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, new TransmissionHandler() {
+            @Override public void onEnd(UUID nodeId) {
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                assert snpTrFut.stores.isEmpty() : snpTrFut.stores.entrySet();
+                assert snpTrFut.partsLeft == 0 : snpTrFut;
+
+                snpTrFut.onDone();
+
+                log.info("Requested snapshot from remote node has been fully received " +
+                    "[snpName=" + snpTrFut.snpName + ", snpTrans=" + snpTrFut + ']');
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onException(UUID nodeId, Throwable err) {
+                RemoteSnapshotFuture fut = rmtSnpReq.get();
+
+                if (fut == null)
+                    return;
+
+                if (fut.rmtNodeId.equals(nodeId))
+                    fut.onDone(err);
+            }
+
+            /** {@inheritDoc} */
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
+                String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
+                String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
+
+                RemoteSnapshotFuture transFut = resolve(nodeId, fileMeta);
+
+                try {
+                    File cacheDir = U.resolveWorkDirectory(tmpWorkDir.getAbsolutePath(),
+                        Paths.get(transFut.snpName, rmtDbNodePath, cacheDirName).toString(),
+                        false);
+
+                    return new File(cacheDir, getPartitionFileName(partId)).getAbsolutePath();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            /**
+             * @param nodeId Remote node id.
+             * @param meta Transmission meta.
+             * @return Resolved transmission future.
+             */
+            private RemoteSnapshotFuture resolve(UUID nodeId, TransmissionMeta meta) {
+                String snpName = (String)meta.params().get(SNP_NAME_PARAM);
+                Integer partsCnt = (Integer)meta.params().get(SNP_PARTITIONS_CNT);
+
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                if (snpTrFut == null || !snpTrFut.snpName.equals(snpName)) {
+                    throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+                        "[snpName=" + snpName + ", meta=" + meta + ", snpTrFut=" + snpTrFut + ']');
+                }
+
+                assert snpTrFut.snpName.equals(snpName) && snpTrFut.rmtNodeId.equals(nodeId) :
+                    "Another transmission in progress [snpTrFut=" + snpTrFut + ", nodeId=" + snpName + ']';
+
+                if (snpTrFut.partsLeft == -1)
+                    snpTrFut.partsLeft = partsCnt;
+
+                return snpTrFut;
+            }
+
+            /**
+             * @param snpTrans Current snapshot transmission.
+             * @param grpPartId Pair of group id and its partition id.
+             */
+            private void finishRecover(RemoteSnapshotFuture snpTrans, GroupPartitionId grpPartId) {
+                FilePageStore pageStore = null;
+
+                try {
+                    pageStore = snpTrans.stores.remove(grpPartId);
+
+                    pageStore.finishRecover();
+
+                    snpTrans.partConsumer.accept(new File(pageStore.getFileAbsolutePath()), grpPartId);
+
+                    snpTrans.partsLeft--;
+                }
+                catch (StorageException e) {
+                    throw new IgniteException(e);
+                }
+                finally {
+                    U.closeQuiet(pageStore);
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+                Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+
+                RemoteSnapshotFuture snpTrFut = resolve(nodeId, initMeta);
+
+                GroupPartitionId grpPartId = new GroupPartitionId(grpId, partId);
+                FilePageStore pageStore = snpTrFut.stores.get(grpPartId);
+
+                if (pageStore == null) {
+                    throw new IgniteException("Partition must be loaded before applying snapshot delta pages " +
+                        "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                }
+
+                pageStore.beginRecover();
+
+                // No snapshot delta pages received. Finalize recovery.
+                if (initMeta.count() == 0)
+                    finishRecover(snpTrFut, grpPartId);
+
+                return new Consumer<ByteBuffer>() {
+                    final LongAdder transferred = new LongAdder();
+
+                    @Override public void accept(ByteBuffer buff) {
+                        try {
+                            assert initMeta.count() != 0 : initMeta;
+
+                            RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                            if (fut0 == null || !fut0.equals(snpTrFut) || fut0.isCancelled()) {
+                                throw new TransmissionCancelledException("Snapshot request is cancelled " +
+                                    "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                            }
+
+                            pageStore.write(PageIO.getPageId(buff), buff, 0, false);
+
+                            transferred.add(buff.capacity());
+
+                            if (transferred.longValue() == initMeta.count())
+                                finishRecover(snpTrFut, grpPartId);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                };
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+                Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+                String snpName = (String)initMeta.params().get(SNP_NAME_PARAM);
+
+                assert grpId != null;
+                assert partId != null;
+                assert snpName != null;
+                assert storeFactory != null;
+
+                RemoteSnapshotFuture transFut = rmtSnpReq.get();
+
+                if (transFut == null) {
+                    throw new IgniteException("Snapshot transmission with given name doesn't exists " +
+                        "[snpName=" + snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                }
+
+                return new Consumer<File>() {
+                    @Override public void accept(File file) {
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || !fut0.equals(transFut) || fut0.isCancelled()) {
+                            throw new TransmissionCancelledException("Snapshot request is cancelled [snpName=" + snpName +
+                                ", grpId=" + grpId + ", partId=" + partId + ']');
+                        }
+
+                        busyLock.enterBusy();
+
+                        try {
+                            FilePageStore pageStore = (FilePageStore)storeFactory
+                                .apply(grpId, false)
+                                .createPageStore(getFlagByPartId(partId),
+                                    file::toPath,
+                                    new LongAdderMetric("NO_OP", null));
+
+                            transFut.stores.put(new GroupPartitionId(grpId, partId), pageStore);
+
+                            pageStore.init();
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                        finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+                };
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        busyLock.block();
+
+        try {
+            // Try stop all snapshot processing if not yet.
+            for (SnapshotFutureTask sctx : locSnpTasks.values())
+                sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+            locSnpTasks.clear();
+
+            RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+            if (snpTrFut != null)
+                snpTrFut.cancel();
+
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null) {
+                    clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+                    clusterSnpFut = null;
+                }
+            }
+
+            if (snpRunner != null)
+                snpRunner.shutdownNow();
+
+            cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+            cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
+            if (discoLsnr != null)
+                cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
+
+            cctx.exchange().unregisterExchangeAwareComponent(this);
+        }
+        finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * @param snpDir Snapshot dir.
+     * @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id).
+     */
+    public static void deleteSnapshot(File snpDir, String folderName) {
+        if (!snpDir.exists())
+            return;
+
+        assert snpDir.isDirectory() : snpDir;
+
+        try {
+            File binDir = resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName);
+            File dbDir = U.resolveWorkDirectory(snpDir.getAbsolutePath(), databaseRelativePath(folderName), false);
+
+            U.delete(binDir);
+            U.delete(dbDir);
+
+            File marshDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath());
+
+            // Concurrently traverse the snapshot marshaller directory and delete all files.
+            Files.walkFileTree(marshDir.toPath(), new SimpleFileVisitor<Path>() {
+                @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                    U.delete(file);
+
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override public FileVisitResult visitFileFailed(Path file, IOException exc) {
+                    // Skip files which can be concurrently removed from FileTree.
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+
+            File db = new File(snpDir, DB_DEFAULT_FOLDER);
+
+            if (!db.exists() || db.list().length == 0)
+                U.delete(snpDir);
+        }
+        catch (IOException | IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return Local snapshot directory for snapshot with given name.
+     */
+    public File snapshotLocalDir(String snpName) {
+        assert locSnpDir != null;
+
+        return new File(locSnpDir, snpName);
+    }
+
+    /**
+     * @return Node snapshot working directory.
+     */
+    public File snapshotTmpDir() {
+        assert tmpWorkDir != null;
+
+        return tmpWorkDir;
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when a snapshot has been started.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartStage(SnapshotOperationRequest req) {
+        if (cctx.kernalContext().clientNode() ||
+            !CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState()))
+            return new GridFinishedFuture<>();
+
+        // Executed inside discovery notifier thread, prior to firing discovery custom event,
+        // so it is safe to set new snapshot task inside this method without synchronization.
+        if (clusterSnpReq != null) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot operation has been rejected. " +
+                "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']'));
+        }
+
+        Set<UUID> leftNodes = new HashSet<>(req.bltNodes);
+        leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+            F.node2id()));
+
+        if (!leftNodes.isEmpty()) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Some of baseline nodes left the cluster " +
+                "prior to snapshot operation start: " + leftNodes));
+        }
+
+        Set<Integer> leftGrps = new HashSet<>(req.grpIds);
+        leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
+
+        if (!leftGrps.isEmpty()) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Some of requested cache groups doesn't exist " +
+                "on the local node [missed=" + leftGrps + ", nodeId=" + cctx.localNodeId() + ']'));
+        }
+
+        Map<Integer, Set<Integer>> parts = new HashMap<>();
+
+        // Prepare collection of pairs group and appropriate cache partition to be snapshot.
+        // Cache group context may be 'null' on some nodes e.g. a node filter is set.
+        for (Integer grpId : req.grpIds) {
+            if (cctx.cache().cacheGroup(grpId) == null)
+                continue;
+
+            parts.put(grpId, null);
+        }
+
+        if (parts.isEmpty())
+            return new GridFinishedFuture<>();
+
+        SnapshotFutureTask task0 = registerSnapshotTask(req.snpName,
+            req.srcNodeId,
+            parts,
+            locSndrFactory.apply(req.snpName));
+
+        clusterSnpReq = req;
+
+        return task0.chain(fut -> {
+            if (fut.error() == null)
+                return new SnapshotOperationResponse();
+            else
+                throw new GridClosureException(fut.error());
+        });
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        if (cctx.kernalContext().clientNode())
+            return;
+
+        SnapshotOperationRequest snpReq = clusterSnpReq;
+
+        if (snpReq == null || !snpReq.rqId.equals(id)) {
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot operation has not been fully completed " +
+                        "[err=" + err + ", snpReq=" + snpReq + ']'));
+
+                    clusterSnpFut = null;
+                }
+
+                return;
+            }
+        }
+
+        if (isLocalNodeCoordinator(cctx.discovery())) {
+            Set<UUID> missed = new HashSet<>(snpReq.bltNodes);
+            missed.removeAll(res.keySet());
+            missed.removeAll(err.keySet());
+
+            snpReq.hasErr = !F.isEmpty(err) || !missed.isEmpty();
+
+            if (snpReq.hasErr) {
+                U.warn(log, "Execution of local snapshot tasks fails or them haven't been executed " +
+                    "due to some of nodes left the cluster. Uncompleted snapshot will be deleted " +
+                    "[err=" + err + ", missed=" + missed + ']');
+            }
+
+            endSnpProc.start(UUID.randomUUID(), snpReq);
+        }
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when the snapshot will be finalized.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotEndStage(SnapshotOperationRequest req) {
+        if (clusterSnpReq == null)
+            return new GridFinishedFuture<>(new SnapshotOperationResponse());
+
+        try {
+            if (req.hasErr)
+                deleteSnapshot(snapshotLocalDir(req.snpName), pdsSettings.folderName());
+
+            removeLastMetaStorageKey();
+        }
+        catch (Exception e) {
+            return new GridFinishedFuture<>(e);
+        }
+
+        return new GridFinishedFuture<>(new SnapshotOperationResponse());
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        SnapshotOperationRequest snpReq = clusterSnpReq;
+
+        if (snpReq == null)
+            return;
+
+        Set<UUID> endFail = new HashSet<>(snpReq.bltNodes);
+        endFail.removeAll(res.keySet());
+
+        clusterSnpReq = null;
+
+        synchronized (snpOpMux) {
+            if (clusterSnpFut != null) {
+                if (endFail.isEmpty() && !snpReq.hasErr) {
+                    clusterSnpFut.onDone();
+
+                    if (log.isInfoEnabled())
+                        log.info("Cluster-wide snapshot operation finished successfully [req=" + snpReq + ']');
+                }
+                else {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
+                        "Local snapshot tasks may not finished completely or finalizing results fails " +
+                        "[hasErr=" + snpReq.hasErr + ", fail=" + endFail + ", err=" + err + ']'));
+                }
+
+                clusterSnpFut = null;
+            }
+        }
+    }
+
+    /**
+     * @return {@code True} if snapshot operation is in progress.
+     */
+    public boolean isSnapshotCreating() {
+        if (clusterSnpReq != null)
+            return true;
+
+        synchronized (snpOpMux) {
+            return clusterSnpReq != null || clusterSnpFut != null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<String> getSnapshots() {
+        if (cctx.kernalContext().clientNode())
+            throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+        synchronized (snpOpMux) {
+            return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+                .map(File::getName)
+                .collect(Collectors.toList());
+        }

Review comment:
       If a baseline node was offline during the snapshot creation, the result of this call on the node will not return the snapshot which was successfully created on other nodes, which will be really confusing for the user. We also need to add a test with snapshot creation with offline nodes. 
   
   I would rework this logic to collect information from all nodes in the cluster and merge it. Also, why not support clients by sending a compute to server nodes?
   
   If we want to keep this behavior, we should rename the method to getSnapshotsLocal, because the current combination of methods in the interface has different semantics.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
##########
@@ -0,0 +1,895 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.FullMessage;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_NODE_STOPPING_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Cluster-wide snapshot test.
+ */
+public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
+    /** Time to wait while rebalance may happen. */
+    private static final long REBALANCE_AWAIT_TIME = GridTestUtils.SF.applyLB(10_000, 3_000);
+
+    /** Cache configuration for test. */
+    private static CacheConfiguration<Integer, Integer> atomicCcfg = new CacheConfiguration<Integer, Integer>("atomicCacheName")
+        .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+        .setBackups(2)
+        .setAffinity(new RendezvousAffinityFunction(false)
+            .setPartitions(CACHE_PARTS_COUNT));
+
+    /** {@code true} if node should be started in separate jvm. */
+    protected volatile boolean jvm;
+
+    /** @throws Exception If fails. */
+    @Before
+    @Override public void beforeTestSnapshot() throws Exception {
+        super.beforeTestSnapshot();
+
+        jvm = false;
+    }
+
+    /**
+     * Take snapshot from the whole cluster and check snapshot consistency when the
+     * cluster tx load starts on a new topology version.
+     * Note: Client nodes and server nodes not in baseline topology must not be affected.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testConsistentClusterSnapshotLoadNewTopology() throws Exception {
+        int grids = 3;
+        String snpName = "backup23012020";
+        AtomicInteger atKey = new AtomicInteger(CACHE_KEYS_RANGE);
+        AtomicInteger txKey = new AtomicInteger(CACHE_KEYS_RANGE);
+
+        IgniteEx ignite = startGrids(grids);
+        startClientGrid();
+
+        ignite.cluster().baselineAutoAdjustEnabled(false);
+        ignite.cluster().state(ACTIVE);
+
+        // Start node not in baseline.
+        IgniteEx notBltIgnite = startGrid(grids);
+        File locSnpDir = snp(notBltIgnite).snapshotLocalDir(SNAPSHOT_NAME);
+        String notBltDirName = folderName(notBltIgnite);
+
+        IgniteCache<Integer, Integer> atCache = ignite.createCache(atomicCcfg);
+
+        for (int idx = 0; idx < CACHE_KEYS_RANGE; idx++) {
+            atCache.put(atKey.incrementAndGet(), -1);
+            ignite.cache(DEFAULT_CACHE_NAME).put(txKey.incrementAndGet(), -1);
+        }
+
+        forceCheckpoint();
+
+        CountDownLatch loadLatch = new CountDownLatch(1);
+
+        ignite.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
+            /** {@inheritDoc} */
+            @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                // First discovery custom event will be a snapshot operation.
+                assertTrue(isSnapshotOperation(fut.firstEvent()));
+                assertTrue("Snapshot must use pme-free exchange", fut.context().exchangeFreeSwitch());
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)fut.firstEvent()).customMessage();
+
+                assertNotNull(msg);
+
+                if (msg instanceof SnapshotDiscoveryMessage)
+                    loadLatch.countDown();
+            }
+        });
+
+        // Start cache load.
+        IgniteInternalFuture<Long> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                U.await(loadLatch);
+
+                while (!Thread.currentThread().isInterrupted()) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int atIdx = rnd.nextInt(grids);
+
+                    // Zero out the sign bit.
+                    grid(atIdx).cache(atomicCcfg.getName()).put(txKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+
+                    int txIdx = rnd.nextInt(grids);
+
+                    grid(txIdx).cache(DEFAULT_CACHE_NAME).put(atKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new RuntimeException(e);
+            }
+        }, 3, "cache-put-");
+
+        try {
+            IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(snpName);
+
+            U.await(loadLatch, 10, TimeUnit.SECONDS);
+
+            fut.get();
+        }
+        finally {
+            loadFut.cancel();
+        }
+
+        // Cluster can be deactivated but we must test snapshot restore when binary recovery also occurred.
+        stopAllGrids();
+
+        assertTrue("Snapshot directory must be empty for node not in baseline topology: " + notBltDirName,
+            !searchDirectoryRecursively(locSnpDir.toPath(), notBltDirName).isPresent());
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, snpName);
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + DEFAULT_CACHE_NAME,
+            CACHE_KEYS_RANGE, snpIg0.cache(DEFAULT_CACHE_NAME).size());
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + atomicCcfg.getName(),
+            CACHE_KEYS_RANGE, snpIg0.cache(atomicCcfg.getName()).size());
+
+        snpIg0.cache(DEFAULT_CACHE_NAME).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + DEFAULT_CACHE_NAME + ", entry=" + e +']', (Integer)e.getValue() < 0));
+
+        snpIg0.cache(atomicCcfg.getName()).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + atomicCcfg.getName() + ", entry=" + e + ']', (Integer)e.getValue() < 0));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotPrimaryBackupsTheSame() throws Exception {
+        int grids = 3;
+        AtomicInteger cacheKey = new AtomicInteger();
+
+        IgniteEx ignite = startGridsWithCache(grids, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<Long> atLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int gId = rnd.nextInt(grids);
+
+                IgniteCache<Integer, Integer> txCache = grid(gId).getOrCreateCache(dfltCacheCfg.getName());
+
+                try (Transaction tx = grid(gId).transactions().txStart()) {
+                    txCache.put(cacheKey.incrementAndGet(), 0);
+
+                    txCache.put(cacheKey.incrementAndGet(), 1);
+
+                    tx.commit();
+                }
+            }
+        }, 5, "tx-cache-put-");
+
+        IgniteInternalFuture<Long> txLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                IgniteCache<Integer, Integer> atomicCache = grid(rnd.nextInt(grids))
+                    .getOrCreateCache(atomicCcfg);
+
+                atomicCache.put(cacheKey.incrementAndGet(), 0);
+            }
+        }, 5, "atomic-cache-put-");
+
+        try {
+            IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+            fut.get();
+        }
+        finally {
+            txLoadFut.cancel();
+            atLoadFut.cancel();
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Block whole rebalancing.
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).blockMessages((node, msg) -> msg instanceof GridDhtPartitionDemandMessage);
+
+        snpIg0.cluster().state(ACTIVE);
+
+        assertFalse("Primary and backup in snapshot must have the same counters. Rebalance must not happen.",
+            GridTestUtils.waitForCondition(() -> {
+                boolean hasMsgs = false;
+
+                for (Ignite g : G.allGrids())
+                    hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+                return hasMsgs;
+            }, REBALANCE_AWAIT_TIME));
+
+        TestRecordingCommunicationSpi.stopBlockAll();
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotConsistencyUnderLoad() throws Exception {
+        int clientsCnt = 50;
+        int balance = 10_000;
+        int transferLimit = 1000;
+        int total = clientsCnt * balance * 2;
+        int grids = 3;
+        int transferThreadCnt = 4;
+        AtomicBoolean stop = new AtomicBoolean(false);
+        CountDownLatch txStarted = new CountDownLatch(1);
+
+        CacheConfiguration<Integer, Account> eastCcfg = txCacheConfig(new CacheConfiguration<>("east"));
+        CacheConfiguration<Integer, Account> westCcfg = txCacheConfig(new CacheConfiguration<>("west"));
+
+        for (int i = 0; i < grids; i++)
+            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration(eastCcfg, westCcfg)));
+
+        grid(0).cluster().state(ACTIVE);
+
+        Ignite client = startClientGrid(grids);
+
+        IgniteCache<Integer, Account> eastCache = client.cache(eastCcfg.getName());
+        IgniteCache<Integer, Account> westCache = client.cache(westCcfg.getName());
+
+        // Create clients with initial balance.
+        for (int i = 0; i < clientsCnt; i++) {
+            eastCache.put(i, new Account(i, balance));
+            westCache.put(i, new Account(i, balance));
+        }
+
+        assertEquals("The initial summary value in all caches is not correct.",
+            total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+        forceCheckpoint();
+
+        IgniteInternalFuture<?> txLoadFut = GridTestUtils.runMultiThreadedAsync(
+            () -> {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int amount;
+
+                try {
+                    while (!stop.get()) {
+                        IgniteEx ignite = grid(rnd.nextInt(grids));
+                        IgniteCache<Integer, Account> east = ignite.cache("east");
+                        IgniteCache<Integer, Account> west = ignite.cache("west");
+
+                        amount = rnd.nextInt(transferLimit);
+
+                        txStarted.countDown();
+
+                        try (Transaction tx = ignite.transactions().txStart()) {
+                            Integer id = rnd.nextInt(clientsCnt);
+
+                            Account acc0 = east.get(id);
+                            Account acc1 = west.get(id);
+
+                            acc0.balance -= amount;
+                            acc1.balance += amount;
+
+                            east.put(id, acc0);
+                            west.put(id, acc1);
+
+                            tx.commit();
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    U.error(log, e);
+
+                    fail("Tx must not be failed.");
+                }
+            }, transferThreadCnt, "transfer-account-thread-");
+
+        try {
+            U.await(txStarted);
+
+            grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+        }
+        finally {
+            stop.set(true);
+        }
+
+        txLoadFut.get();
+
+        assertEquals("The summary value should not changed during tx transfers.",
+            total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+        stopAllGrids();
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, SNAPSHOT_NAME);
+
+        assertEquals("The total amount of all cache values must not changed in snapshot.",
+            total, sumAllCacheValues(snpIg0, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithCacheNodeFilter() throws Exception {
+        int grids = 4;
+
+        CacheConfiguration<Integer, Integer> ccfg = txCacheConfig(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+            .setNodeFilter(node -> node.consistentId().toString().endsWith("1"));
+
+        for (int i = 0; i < grids; i++)
+            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration()));
+
+        IgniteEx ig0 = grid(0);
+
+        ig0.cluster().baselineAutoAdjustEnabled(false);
+        ig0.cluster().state(ACTIVE);
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            ig0.getOrCreateCache(ccfg).put(i, i);
+
+        ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(grids,
+            cfg -> resolveSnapshotWorkDirectory(cfg.setCacheConfiguration()).getAbsolutePath(),
+            SNAPSHOT_NAME,
+            true);
+
+        awaitPartitionMapExchange();
+
+        assertSnapshotCacheKeys(snp.cache(ccfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testRejectCacheStopDuringClusterSnapshot() throws Exception {
+        // Block the full message, so cluster-wide snapshot operation would not be fully completed.
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+        spi.block((msg) -> {
+            if (msg instanceof FullMessage) {
+                FullMessage<?> msg0 = (FullMessage<?>)msg;
+
+                assertEquals("Snapshot distributed process must be used",
+                    DistributedProcess.DistributedProcessType.START_SNAPSHOT.ordinal(), msg0.type());
+
+                assertTrue("Snapshot has to be finished successfully on all nodes", msg0.error().isEmpty());
+
+                return true;
+            }
+
+            return false;
+        });
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        spi.waitBlocked(10_000L);
+
+        // Creating of new caches should not be blocked.
+        ignite.getOrCreateCache(dfltCacheCfg.setName("default2"))
+            .put(1, 1);
+
+        forceCheckpoint();
+
+        assertThrowsAnyCause(log,
+            () -> {
+                ignite.destroyCache(DEFAULT_CACHE_NAME);
+
+                return 0;
+            },
+            IgniteCheckedException.class,
+            SNP_IN_PROGRESS_ERR_MSG);
+
+        spi.unblock();
+
+        fut.get();
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testBltChangeDuringClusterSnapshot() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        startGrid(3);
+
+        long topVer = ignite.cluster().topologyVersion();
+
+        BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+        spi.block((msg) -> msg instanceof FullMessage);
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        spi.waitBlocked(10_000L);
+
+        // Not baseline node joins successfully.
+        String grid4Dir = folderName(startGrid(4));
+
+        // Not blt node left the cluster and snapshot not affected.
+        stopGrid(4);
+
+        // Client node must connect successfully.
+        startClientGrid(4);
+
+        // Changing baseline complete successfully.
+        ignite.cluster().setBaselineTopology(topVer);
+
+        spi.unblock();
+
+        fut.get();
+
+        assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + grid4Dir,
+            !searchDirectoryRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(), grid4Dir).isPresent());
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotExOnInitiatorLeft() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+        spi.block((msg) -> msg instanceof FullMessage);
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        spi.waitBlocked(10_000L);
+
+        ignite.close();
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            NodeStoppingException.class,
+            SNP_NODE_STOPPING_ERR_MSG);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotExistsException() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        assertThrowsAnyCause(log,
+            () -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
+            IgniteException.class,
+            "Snapshot with given name already exists.");
+
+        stopAllGrids();
+
+        // Check that snapshot has not been accidentally deleted.
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCleanedOnLeft() throws Exception {
+        CountDownLatch block = new CountDownLatch(1);
+        CountDownLatch partProcessed = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
+        String dirNameIgnite0 = folderName(ignite);
+
+        String dirNameIgnite1 = folderName(grid(1));
+
+        snp(grid(1)).localSnapshotSenderFactory(
+            blockingLocalSnapshotSender(grid(1), partProcessed, block));
+
+        TestRecordingCommunicationSpi commSpi1 = TestRecordingCommunicationSpi.spi(grid(1));
+        commSpi1.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
+
+        IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        U.await(partProcessed);
+
+        stopGrid(1);
+
+        block.countDown();
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            IgniteCheckedException.class,
+            "Snapshot creation has been finished with an error");
+
+        assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + dirNameIgnite0,
+            !searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite0).isPresent());
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        // Snapshot directory must be cleaned.
+        assertTrue("Snapshot directory must be empty for node 1 due to snapshot future fail: " + dirNameIgnite1,
+            !searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite1).isPresent());
+
+        List<String> allSnapshots = snp(ignite).getSnapshots();
+
+        assertTrue("Snapshot directory must be empty due to snapshot fail: " + allSnapshots,
+            allSnapshots.isEmpty());
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testRecoveryClusterSnapshotJvmHalted() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        String grid0Dir = folderName(ignite);
+        String grid1Dir = folderName(grid(1));
+        File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
+
+        jvm = true;
+
+        IgniteConfiguration cfg2 = optimize(getConfiguration(getTestIgniteInstanceName(2)));
+
+        cfg2.getDataStorageConfiguration()
+            .setFileIOFactory(new HaltJvmFileIOFactory(new RandomAccessFileIOFactory(),
+                (Predicate<File> & Serializable) file -> {
+                    // Trying to create FileIO over partition file.
+                    return file.getAbsolutePath().contains(SNAPSHOT_NAME);
+                }));
+
+        startGrid(cfg2);
+
+        String grid2Dir = U.maskForFileName(cfg2.getConsistentId().toString());
+
+        jvm = false;
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+        awaitPartitionMapExchange();
+
+        assertThrowsAnyCause(log,
+            () -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
+            IgniteCheckedException.class,
+            "Snapshot creation has been finished with an error");
+
+        assertTrue("Snapshot directory must be empty: " + grid0Dir,
+            !searchDirectoryRecursively(locSnpDir.toPath(), grid0Dir).isPresent());
+
+        assertTrue("Snapshot directory must be empty: " + grid1Dir,
+            !searchDirectoryRecursively(locSnpDir.toPath(), grid1Dir).isPresent());
+
+        assertTrue("Snapshot directory must exist due to grid2 has been halted and cleanup not fully performed: " + grid2Dir,
+            searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
+
+        IgniteEx grid2 = startGrid(2);
+
+        assertTrue("Snapshot directory must be empty after recovery: " + grid2Dir,
+            !searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
+
+        awaitPartitionMapExchange();
+
+        assertTrue("Snapshot directory must be empty", grid2.snapshot().getSnapshots().isEmpty());
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithRebalancing() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        TestRecordingCommunicationSpi commSpi = TestRecordingCommunicationSpi.spi(ignite);
+        commSpi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+
+        startGrid(2);
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+        commSpi.waitForBlocked();
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        commSpi.stopBlock(true);
+
+        fut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+        awaitPartitionMapExchange();
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));

Review comment:
       Need to check that the rebalancing actually finished and affinity reassignment is actually triggered after snapshot restore.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -0,0 +1,1943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.ignite.cluster.ClusterState.active;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
+import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
+import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId;
+import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/**
+ * Internal implementation of snapshot operations over persistence caches.
+ * <p>
+ * There are two major actions available:
+ * <ul>
+ *     <li>Create snapshot of the whole cluster cache groups by triggering PME to achieve consistency.</li>
+ *     <li>Create local snapshot of requested cache groups and send it to the node which request this operation.
+ *     Cache groups will be transmitted using internal API for transferring files. See {@link TransmissionHandler}.</li>
+ * </ul>
+ */
+public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
+    implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener {
+    /** File with delta pages suffix. */
+    public static final String DELTA_SUFFIX = ".delta";
+
+    /** File name template consists of delta pages. */
+    public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + DELTA_SUFFIX;
+
+    /** File name template for index delta pages. */
+    public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + DELTA_SUFFIX;
+
+    /** Text Reason for checkpoint to start snapshot operation. */
+    public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
+
+    /** Name prefix for each remote snapshot operation. */
+    public static final String RMT_SNAPSHOT_PREFIX = "snapshot_";
+
+    /** Default snapshot directory for loading remote snapshots. */
+    public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
+
+    /** Timeout in millisecond for snapshot operations. */
+    public static final long DFLT_SNAPSHOT_TIMEOUT = 15_000L;
+
+    /** Snapshot in progress error message. */
+    public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
+
+    /** Error message to finalize snapshot tasks. */
+    public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
+        "is stopping";
+
+    /** Metastorage key to save currently running snapshot. */
+    public static final String SNP_RUNNING_KEY = "snapshot-running";
+
+    /** Snapshot metrics prefix. */
+    public static final String SNAPSHOT_METRICS = "snapshot";
+
+    /** Prefix for snapshot threads. */
+    private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner";
+
+    /** Total number of thread to perform local snapshot. */
+    private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
+
+    /** Default snapshot topic to receive snapshots from remote node. */
+    private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp");
+
+    /** File transmission parameter of cache group id. */
+    private static final String SNP_GRP_ID_PARAM = "grpId";
+
+    /** File transmission parameter of cache partition id. */
+    private static final String SNP_PART_ID_PARAM = "partId";
+
+    /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
+    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+    /** File transmission parameter of a cache directory with is currently sends its partitions. */
+    private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+    /** Snapshot parameter name for a file transmission. */
+    private static final String SNP_NAME_PARAM = "snpName";
+
+    /** Total snapshot files count which receiver should expect to receive. */
+    private static final String SNP_PARTITIONS_CNT = "partsCnt";
+
+    /**
+     * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
+     * It is important to have only only buffer per thread (instead of creating each buffer per
+     * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is redundant and can lead to OOM errors. Direct buffer
+     * deallocate only when ByteBuffer is garbage collected, but it can get out of off-heap memory before it.
+     */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
+    /** Map of registered cache snapshot processes and their corresponding contexts. */
+    private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+
+    /** Lock to protect the resources is used. */
+    private final GridBusyLock busyLock = new GridBusyLock();
+
+    /** Requested snapshot from remote node. */
+    private final AtomicReference<RemoteSnapshotFuture> rmtSnpReq = new AtomicReference<>();
+
+    /** Mutex used to order cluster snapshot operation progress. */
+    private final Object snpOpMux = new Object();
+
+    /** Take snapshot operation procedure. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;
+
+    /** Check previously performed snapshot operation and delete uncompleted files if need. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;
+
+    /** Resolved persistent data storage settings. */
+    private volatile PdsFolderSettings pdsSettings;
+
+    /** Fully initialized metastorage. */
+    private volatile ReadWriteMetastorage metaStorage;
+
+    /** Local snapshot sender factory. */
+    private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
+
+    /** Main snapshot directory to save created snapshots. */
+    private volatile File locSnpDir;
+
+    /**
+     * Working directory for loaded snapshots from the remote nodes and storing
+     * temporary partition delta-files of locally started snapshot process.
+     */
+    private File tmpWorkDir;
+
+    /** Factory to working with delta as file storage. */
+    private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+    /** Factory to create page store for restore. */
+    private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> storeFactory;
+
+    /** Snapshot thread pool to perform local partition snapshots. */
+    private ExecutorService snpRunner;
+
+    /** System discovery message listener. */
+    private DiscoveryEventListener discoLsnr;
+
+    /** Cluster snapshot operation requested by user. */
+    private ClusterSnapshotFuture clusterSnpFut;
+
+    /** Current snapshot operation on local node. */
+    private volatile SnapshotOperationRequest clusterSnpReq;
+
+    /** {@code true} if recovery process occurred for snapshot. */
+    private volatile boolean recovered;
+
+    /** Last seen cluster snapshot operation. */
+    private volatile ClusterSnapshotFuture lastSeenSnpFut = new ClusterSnapshotFuture();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteSnapshotManager(GridKernalContext ctx) {
+        locBuff = ThreadLocal.withInitial(() ->
+            ByteBuffer.allocateDirect(ctx.config().getDataStorageConfiguration().getPageSize())
+                .order(ByteOrder.nativeOrder()));
+
+        startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::initLocalSnapshotStartStage,
+            this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new);
+
+        endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::initLocalSnapshotEndStage,
+            this::processLocalSnapshotEndStageResult);
+    }
+
+    /**
+     * @param snapshotCacheDir Snapshot directory to store files.
+     * @param partId Cache partition identifier.
+     * @return A file representation.
+     */
+    public static File partDeltaFile(File snapshotCacheDir, int partId) {
+        return new File(snapshotCacheDir, partDeltaFileName(partId));
+    }
+
+    /**
+     * @param partId Partition id.
+     * @return File name of delta partition pages.
+     */
+    public static String partDeltaFileName(int partId) {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+        return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : String.format(PART_DELTA_TEMPLATE, partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        super.start0();
+
+        GridKernalContext ctx = cctx.kernalContext();
+
+        if (ctx.clientNode())
+            return;
+
+        if (!CU.isPersistenceEnabled(ctx.config()))
+            return;
+
+        snpRunner = new IgniteThreadPoolExecutor(SNAPSHOT_RUNNER_THREAD_PREFIX,
+            cctx.igniteInstanceName(),
+            SNAPSHOT_THREAD_POOL_SIZE,
+            SNAPSHOT_THREAD_POOL_SIZE,
+            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            SYSTEM_POOL,
+            new OomExceptionHandler(ctx));
+
+        assert cctx.pageStore() instanceof FilePageStoreManager;
+
+        FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+        pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
+
+        locSnpDir = resolveSnapshotWorkDirectory(ctx.config());
+        tmpWorkDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR, true);
+
+        U.ensureDirectory(locSnpDir, "snapshot work directory", log);
+        U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);
+
+        MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);
+
+        mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
+            "The system time of the last cluster snapshot request start time on this node.");
+        mreg.register("LastSnapshotEndTime", () -> lastSeenSnpFut.endTime,
+            "The system time of the last cluster snapshot request end time on this node.");
+        mreg.register("LastSnapshotName", () -> lastSeenSnpFut.name, String.class,
+            "The name of last started cluster snapshot request on this node.");
+        mreg.register("LastSnapshotErrorMessage",
+            () -> lastSeenSnpFut.error() == null ? "" : lastSeenSnpFut.error().getMessage(),
+            String.class,
+            "The error message of last started cluster snapshot request which fail with an error. " +
+                "This value will be empty if last snapshot request has been completed successfully.");
+        mreg.register("LocalSnapshotList", this::getSnapshots, List.class,
+            "The list of names of all snapshots currently saved on the local node with respect to " +
+                "the configured via IgniteConfiguration snapshot working path.");
+
+        storeFactory = storeMgr::getPageStoreFactory;
+
+        cctx.exchange().registerExchangeAwareComponent(this);
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+
+        // Receive remote snapshots requests.
+        cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+                if (!busyLock.enterBusy())
+                    return;
+
+                try {
+                    if (msg instanceof SnapshotRequestMessage) {
+                        SnapshotRequestMessage reqMsg0 = (SnapshotRequestMessage)msg;
+                        String snpName = reqMsg0.snapshotName();
+
+                        synchronized (this) {
+                            SnapshotFutureTask task = lastScheduledRemoteSnapshotTask(nodeId);
+
+                            if (task != null) {
+                                // Task will also be removed from local map due to the listener on future done.
+                                task.cancel();
+
+                                log.info("Snapshot request has been cancelled due to another request received " +
+                                    "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                            }
+                        }
+
+                        SnapshotFutureTask task = registerSnapshotTask(snpName,
+                            nodeId,
+                            reqMsg0.parts(),
+                            remoteSnapshotSender(snpName, nodeId));
+
+                        task.listen(f -> {
+                            if (f.error() == null)
+                                return;
+
+                            U.error(log, "Failed to process request of creating a snapshot " +
+                                "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                            try {
+                                cctx.gridIO().sendToCustomTopic(nodeId,
+                                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                    new SnapshotResponseMessage(reqMsg0.snapshotName(), f.error().getMessage()),
+                                    SYSTEM_POOL);
+                            }
+                            catch (IgniteCheckedException ex0) {
+                                U.error(log, "Fail to send the response message with processing snapshot request " +
+                                    "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                            }
+                        });
+
+                        task.start();
+                    }
+                    else if (msg instanceof SnapshotResponseMessage) {
+                        SnapshotResponseMessage respMsg0 = (SnapshotResponseMessage)msg;
+
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || !fut0.snpName.equals(respMsg0.snapshotName())) {
+                            if (log.isInfoEnabled()) {
+                                log.info("A stale snapshot response message has been received. Will be ignored " +
+                                    "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                            }
+
+                            return;
+                        }
+
+                        if (respMsg0.errorMessage() != null) {
+                            fut0.onDone(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                                "on the remote node with an error: " + respMsg0.errorMessage()));
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                    cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+
+        cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                UUID leftNodeId = evt.eventNode().id();
+
+                if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
+                    SnapshotOperationRequest snpReq = clusterSnpReq;
+
+                    for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+                        if (sctx.sourceNodeId().equals(leftNodeId) ||
+                            (snpReq != null &&
+                                snpReq.snpName.equals(sctx.snapshotName()) &&
+                                snpReq.bltNodes.contains(leftNodeId))) {
+                            sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " +
+                                "One of baseline nodes left the cluster: " + leftNodeId));
+                        }
+                    }
+
+                    RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                    if (snpTrFut != null && snpTrFut.rmtNodeId.equals(leftNodeId)) {
+                        snpTrFut.onDone(new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                            "requested left the grid"));
+                    }
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        // Remote snapshot handler.
+        cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, new TransmissionHandler() {
+            @Override public void onEnd(UUID nodeId) {
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                assert snpTrFut.stores.isEmpty() : snpTrFut.stores.entrySet();
+                assert snpTrFut.partsLeft == 0 : snpTrFut;
+
+                snpTrFut.onDone();
+
+                log.info("Requested snapshot from remote node has been fully received " +
+                    "[snpName=" + snpTrFut.snpName + ", snpTrans=" + snpTrFut + ']');
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onException(UUID nodeId, Throwable err) {
+                RemoteSnapshotFuture fut = rmtSnpReq.get();
+
+                if (fut == null)
+                    return;
+
+                if (fut.rmtNodeId.equals(nodeId))
+                    fut.onDone(err);
+            }
+
+            /** {@inheritDoc} */
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
+                String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
+                String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
+
+                RemoteSnapshotFuture transFut = resolve(nodeId, fileMeta);
+
+                try {
+                    File cacheDir = U.resolveWorkDirectory(tmpWorkDir.getAbsolutePath(),
+                        Paths.get(transFut.snpName, rmtDbNodePath, cacheDirName).toString(),
+                        false);
+
+                    return new File(cacheDir, getPartitionFileName(partId)).getAbsolutePath();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            /**
+             * @param nodeId Remote node id.
+             * @param meta Transmission meta.
+             * @return Resolved transmission future.
+             */
+            private RemoteSnapshotFuture resolve(UUID nodeId, TransmissionMeta meta) {
+                String snpName = (String)meta.params().get(SNP_NAME_PARAM);
+                Integer partsCnt = (Integer)meta.params().get(SNP_PARTITIONS_CNT);
+
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                if (snpTrFut == null || !snpTrFut.snpName.equals(snpName)) {
+                    throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+                        "[snpName=" + snpName + ", meta=" + meta + ", snpTrFut=" + snpTrFut + ']');
+                }
+
+                assert snpTrFut.snpName.equals(snpName) && snpTrFut.rmtNodeId.equals(nodeId) :
+                    "Another transmission in progress [snpTrFut=" + snpTrFut + ", nodeId=" + snpName + ']';
+
+                if (snpTrFut.partsLeft == -1)
+                    snpTrFut.partsLeft = partsCnt;
+
+                return snpTrFut;
+            }
+
+            /**
+             * @param snpTrans Current snapshot transmission.
+             * @param grpPartId Pair of group id and its partition id.
+             */
+            private void finishRecover(RemoteSnapshotFuture snpTrans, GroupPartitionId grpPartId) {
+                FilePageStore pageStore = null;
+
+                try {
+                    pageStore = snpTrans.stores.remove(grpPartId);
+
+                    pageStore.finishRecover();
+
+                    snpTrans.partConsumer.accept(new File(pageStore.getFileAbsolutePath()), grpPartId);
+
+                    snpTrans.partsLeft--;
+                }
+                catch (StorageException e) {
+                    throw new IgniteException(e);
+                }
+                finally {
+                    U.closeQuiet(pageStore);
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+                Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+
+                RemoteSnapshotFuture snpTrFut = resolve(nodeId, initMeta);
+
+                GroupPartitionId grpPartId = new GroupPartitionId(grpId, partId);
+                FilePageStore pageStore = snpTrFut.stores.get(grpPartId);
+
+                if (pageStore == null) {
+                    throw new IgniteException("Partition must be loaded before applying snapshot delta pages " +
+                        "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                }
+
+                pageStore.beginRecover();
+
+                // No snapshot delta pages received. Finalize recovery.
+                if (initMeta.count() == 0)
+                    finishRecover(snpTrFut, grpPartId);
+
+                return new Consumer<ByteBuffer>() {
+                    final LongAdder transferred = new LongAdder();
+
+                    @Override public void accept(ByteBuffer buff) {
+                        try {
+                            assert initMeta.count() != 0 : initMeta;
+
+                            RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                            if (fut0 == null || !fut0.equals(snpTrFut) || fut0.isCancelled()) {
+                                throw new TransmissionCancelledException("Snapshot request is cancelled " +
+                                    "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                            }
+
+                            pageStore.write(PageIO.getPageId(buff), buff, 0, false);
+
+                            transferred.add(buff.capacity());
+
+                            if (transferred.longValue() == initMeta.count())
+                                finishRecover(snpTrFut, grpPartId);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                };
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+                Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+                String snpName = (String)initMeta.params().get(SNP_NAME_PARAM);
+
+                assert grpId != null;
+                assert partId != null;
+                assert snpName != null;
+                assert storeFactory != null;
+
+                RemoteSnapshotFuture transFut = rmtSnpReq.get();
+
+                if (transFut == null) {
+                    throw new IgniteException("Snapshot transmission with given name doesn't exists " +
+                        "[snpName=" + snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                }
+
+                return new Consumer<File>() {
+                    @Override public void accept(File file) {
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || !fut0.equals(transFut) || fut0.isCancelled()) {
+                            throw new TransmissionCancelledException("Snapshot request is cancelled [snpName=" + snpName +
+                                ", grpId=" + grpId + ", partId=" + partId + ']');
+                        }
+
+                        busyLock.enterBusy();
+
+                        try {
+                            FilePageStore pageStore = (FilePageStore)storeFactory
+                                .apply(grpId, false)
+                                .createPageStore(getFlagByPartId(partId),
+                                    file::toPath,
+                                    new LongAdderMetric("NO_OP", null));
+
+                            transFut.stores.put(new GroupPartitionId(grpId, partId), pageStore);
+
+                            pageStore.init();
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                        finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+                };
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        busyLock.block();
+
+        try {
+            // Try stop all snapshot processing if not yet.
+            for (SnapshotFutureTask sctx : locSnpTasks.values())
+                sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+            locSnpTasks.clear();
+
+            RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+            if (snpTrFut != null)
+                snpTrFut.cancel();
+
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null) {
+                    clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+                    clusterSnpFut = null;
+                }
+            }
+
+            if (snpRunner != null)
+                snpRunner.shutdownNow();
+
+            cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+            cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
+            if (discoLsnr != null)
+                cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
+
+            cctx.exchange().unregisterExchangeAwareComponent(this);
+        }
+        finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * @param snpDir Snapshot dir.
+     * @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id).
+     */
+    public static void deleteSnapshot(File snpDir, String folderName) {
+        if (!snpDir.exists())
+            return;
+
+        assert snpDir.isDirectory() : snpDir;
+
+        try {
+            File binDir = resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName);
+            File dbDir = U.resolveWorkDirectory(snpDir.getAbsolutePath(), databaseRelativePath(folderName), false);
+
+            U.delete(binDir);
+            U.delete(dbDir);
+
+            File marshDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath());
+
+            // Concurrently traverse the snapshot marshaller directory and delete all files.
+            Files.walkFileTree(marshDir.toPath(), new SimpleFileVisitor<Path>() {
+                @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                    U.delete(file);
+
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override public FileVisitResult visitFileFailed(Path file, IOException exc) {
+                    // Skip files which can be concurrently removed from FileTree.
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+
+            File db = new File(snpDir, DB_DEFAULT_FOLDER);
+
+            if (!db.exists() || db.list().length == 0)
+                U.delete(snpDir);
+        }
+        catch (IOException | IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return Local snapshot directory for snapshot with given name.
+     */
+    public File snapshotLocalDir(String snpName) {
+        assert locSnpDir != null;
+
+        return new File(locSnpDir, snpName);
+    }
+
+    /**
+     * @return Node snapshot working directory.
+     */
+    public File snapshotTmpDir() {
+        assert tmpWorkDir != null;
+
+        return tmpWorkDir;
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when a snapshot has been started.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartStage(SnapshotOperationRequest req) {
+        if (cctx.kernalContext().clientNode() ||
+            !CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState()))
+            return new GridFinishedFuture<>();
+
+        // Executed inside discovery notifier thread, prior to firing discovery custom event,
+        // so it is safe to set new snapshot task inside this method without synchronization.
+        if (clusterSnpReq != null) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot operation has been rejected. " +
+                "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']'));
+        }
+
+        Set<UUID> leftNodes = new HashSet<>(req.bltNodes);
+        leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+            F.node2id()));
+
+        if (!leftNodes.isEmpty()) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Some of baseline nodes left the cluster " +
+                "prior to snapshot operation start: " + leftNodes));
+        }
+
+        Set<Integer> leftGrps = new HashSet<>(req.grpIds);
+        leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
+
+        if (!leftGrps.isEmpty()) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Some of requested cache groups doesn't exist " +
+                "on the local node [missed=" + leftGrps + ", nodeId=" + cctx.localNodeId() + ']'));
+        }
+
+        Map<Integer, Set<Integer>> parts = new HashMap<>();
+
+        // Prepare collection of pairs group and appropriate cache partition to be snapshot.
+        // Cache group context may be 'null' on some nodes e.g. a node filter is set.
+        for (Integer grpId : req.grpIds) {
+            if (cctx.cache().cacheGroup(grpId) == null)
+                continue;
+
+            parts.put(grpId, null);
+        }
+
+        if (parts.isEmpty())
+            return new GridFinishedFuture<>();
+
+        SnapshotFutureTask task0 = registerSnapshotTask(req.snpName,
+            req.srcNodeId,
+            parts,
+            locSndrFactory.apply(req.snpName));
+
+        clusterSnpReq = req;
+
+        return task0.chain(fut -> {
+            if (fut.error() == null)
+                return new SnapshotOperationResponse();
+            else
+                throw new GridClosureException(fut.error());
+        });
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        if (cctx.kernalContext().clientNode())
+            return;
+
+        SnapshotOperationRequest snpReq = clusterSnpReq;
+
+        if (snpReq == null || !snpReq.rqId.equals(id)) {
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot operation has not been fully completed " +
+                        "[err=" + err + ", snpReq=" + snpReq + ']'));
+
+                    clusterSnpFut = null;
+                }
+
+                return;
+            }
+        }
+
+        if (isLocalNodeCoordinator(cctx.discovery())) {
+            Set<UUID> missed = new HashSet<>(snpReq.bltNodes);
+            missed.removeAll(res.keySet());
+            missed.removeAll(err.keySet());
+
+            snpReq.hasErr = !F.isEmpty(err) || !missed.isEmpty();
+
+            if (snpReq.hasErr) {
+                U.warn(log, "Execution of local snapshot tasks fails or them haven't been executed " +
+                    "due to some of nodes left the cluster. Uncompleted snapshot will be deleted " +
+                    "[err=" + err + ", missed=" + missed + ']');
+            }
+
+            endSnpProc.start(UUID.randomUUID(), snpReq);
+        }
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when the snapshot will be finalized.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotEndStage(SnapshotOperationRequest req) {
+        if (clusterSnpReq == null)
+            return new GridFinishedFuture<>(new SnapshotOperationResponse());
+
+        try {
+            if (req.hasErr)
+                deleteSnapshot(snapshotLocalDir(req.snpName), pdsSettings.folderName());
+
+            removeLastMetaStorageKey();
+        }
+        catch (Exception e) {
+            return new GridFinishedFuture<>(e);
+        }
+
+        return new GridFinishedFuture<>(new SnapshotOperationResponse());
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        SnapshotOperationRequest snpReq = clusterSnpReq;
+
+        if (snpReq == null)
+            return;
+
+        Set<UUID> endFail = new HashSet<>(snpReq.bltNodes);
+        endFail.removeAll(res.keySet());
+
+        clusterSnpReq = null;
+
+        synchronized (snpOpMux) {
+            if (clusterSnpFut != null) {
+                if (endFail.isEmpty() && !snpReq.hasErr) {
+                    clusterSnpFut.onDone();
+
+                    if (log.isInfoEnabled())
+                        log.info("Cluster-wide snapshot operation finished successfully [req=" + snpReq + ']');
+                }
+                else {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
+                        "Local snapshot tasks may not finished completely or finalizing results fails " +
+                        "[hasErr=" + snpReq.hasErr + ", fail=" + endFail + ", err=" + err + ']'));
+                }
+
+                clusterSnpFut = null;
+            }
+        }
+    }
+
+    /**
+     * @return {@code True} if snapshot operation is in progress.
+     */
+    public boolean isSnapshotCreating() {
+        if (clusterSnpReq != null)
+            return true;
+
+        synchronized (snpOpMux) {
+            return clusterSnpReq != null || clusterSnpFut != null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<String> getSnapshots() {
+        if (cctx.kernalContext().clientNode())
+            throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+        synchronized (snpOpMux) {
+            return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+                .map(File::getName)
+                .collect(Collectors.toList());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> createSnapshot(String name) {
+        A.notNullOrEmpty(name, "name");
+
+        try {
+            if (cctx.kernalContext().clientNode())
+                throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+

Review comment:
       Why not support snapshot trigger from client via compute?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -0,0 +1,1943 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.ignite.cluster.ClusterState.active;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
+import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
+import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId;
+import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/**
+ * Internal implementation of snapshot operations over persistence caches.
+ * <p>
+ * There are two major actions available:
+ * <ul>
+ *     <li>Create snapshot of the whole cluster cache groups by triggering PME to achieve consistency.</li>
+ *     <li>Create local snapshot of requested cache groups and send it to the node which request this operation.
+ *     Cache groups will be transmitted using internal API for transferring files. See {@link TransmissionHandler}.</li>
+ * </ul>
+ */
+public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
+    implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener {
+    /** File with delta pages suffix. */
+    public static final String DELTA_SUFFIX = ".delta";
+
+    /** File name template consists of delta pages. */
+    public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + DELTA_SUFFIX;
+
+    /** File name template for index delta pages. */
+    public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + DELTA_SUFFIX;
+
+    /** Text Reason for checkpoint to start snapshot operation. */
+    public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
+
+    /** Name prefix for each remote snapshot operation. */
+    public static final String RMT_SNAPSHOT_PREFIX = "snapshot_";
+
+    /** Default snapshot directory for loading remote snapshots. */
+    public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
+
+    /** Timeout in millisecond for snapshot operations. */
+    public static final long DFLT_SNAPSHOT_TIMEOUT = 15_000L;
+
+    /** Snapshot in progress error message. */
+    public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
+
+    /** Error message to finalize snapshot tasks. */
+    public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
+        "is stopping";
+
+    /** Metastorage key to save currently running snapshot. */
+    public static final String SNP_RUNNING_KEY = "snapshot-running";
+
+    /** Snapshot metrics prefix. */
+    public static final String SNAPSHOT_METRICS = "snapshot";
+
+    /** Prefix for snapshot threads. */
+    private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner";
+
+    /** Total number of thread to perform local snapshot. */
+    private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
+
+    /** Default snapshot topic to receive snapshots from remote node. */
+    private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp");
+
+    /** File transmission parameter of cache group id. */
+    private static final String SNP_GRP_ID_PARAM = "grpId";
+
+    /** File transmission parameter of cache partition id. */
+    private static final String SNP_PART_ID_PARAM = "partId";
+
+    /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
+    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+    /** File transmission parameter of a cache directory with is currently sends its partitions. */
+    private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+    /** Snapshot parameter name for a file transmission. */
+    private static final String SNP_NAME_PARAM = "snpName";
+
+    /** Total snapshot files count which receiver should expect to receive. */
+    private static final String SNP_PARTITIONS_CNT = "partsCnt";
+
+    /**
+     * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
+     * It is important to have only only buffer per thread (instead of creating each buffer per
+     * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is redundant and can lead to OOM errors. Direct buffer
+     * deallocate only when ByteBuffer is garbage collected, but it can get out of off-heap memory before it.
+     */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
+    /** Map of registered cache snapshot processes and their corresponding contexts. */
+    private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+
+    /** Lock to protect the resources is used. */
+    private final GridBusyLock busyLock = new GridBusyLock();
+
+    /** Requested snapshot from remote node. */
+    private final AtomicReference<RemoteSnapshotFuture> rmtSnpReq = new AtomicReference<>();
+
+    /** Mutex used to order cluster snapshot operation progress. */
+    private final Object snpOpMux = new Object();
+
+    /** Take snapshot operation procedure. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;
+
+    /** Check previously performed snapshot operation and delete uncompleted files if need. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;
+
+    /** Resolved persistent data storage settings. */
+    private volatile PdsFolderSettings pdsSettings;
+
+    /** Fully initialized metastorage. */
+    private volatile ReadWriteMetastorage metaStorage;
+
+    /** Local snapshot sender factory. */
+    private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
+
+    /** Main snapshot directory to save created snapshots. */
+    private volatile File locSnpDir;
+
+    /**
+     * Working directory for loaded snapshots from the remote nodes and storing
+     * temporary partition delta-files of locally started snapshot process.
+     */
+    private File tmpWorkDir;
+
+    /** Factory to working with delta as file storage. */
+    private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+    /** Factory to create page store for restore. */
+    private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> storeFactory;
+
+    /** Snapshot thread pool to perform local partition snapshots. */
+    private ExecutorService snpRunner;
+
+    /** System discovery message listener. */
+    private DiscoveryEventListener discoLsnr;
+
+    /** Cluster snapshot operation requested by user. */
+    private ClusterSnapshotFuture clusterSnpFut;
+
+    /** Current snapshot operation on local node. */
+    private volatile SnapshotOperationRequest clusterSnpReq;
+
+    /** {@code true} if recovery process occurred for snapshot. */
+    private volatile boolean recovered;
+
+    /** Last seen cluster snapshot operation. */
+    private volatile ClusterSnapshotFuture lastSeenSnpFut = new ClusterSnapshotFuture();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteSnapshotManager(GridKernalContext ctx) {
+        locBuff = ThreadLocal.withInitial(() ->
+            ByteBuffer.allocateDirect(ctx.config().getDataStorageConfiguration().getPageSize())
+                .order(ByteOrder.nativeOrder()));
+
+        startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::initLocalSnapshotStartStage,
+            this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new);
+
+        endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::initLocalSnapshotEndStage,
+            this::processLocalSnapshotEndStageResult);
+    }
+
+    /**
+     * @param snapshotCacheDir Snapshot directory to store files.
+     * @param partId Cache partition identifier.
+     * @return A file representation.
+     */
+    public static File partDeltaFile(File snapshotCacheDir, int partId) {
+        return new File(snapshotCacheDir, partDeltaFileName(partId));
+    }
+
+    /**
+     * @param partId Partition id.
+     * @return File name of delta partition pages.
+     */
+    public static String partDeltaFileName(int partId) {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+        return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : String.format(PART_DELTA_TEMPLATE, partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        super.start0();
+
+        GridKernalContext ctx = cctx.kernalContext();
+
+        if (ctx.clientNode())
+            return;
+
+        if (!CU.isPersistenceEnabled(ctx.config()))
+            return;
+
+        snpRunner = new IgniteThreadPoolExecutor(SNAPSHOT_RUNNER_THREAD_PREFIX,
+            cctx.igniteInstanceName(),
+            SNAPSHOT_THREAD_POOL_SIZE,
+            SNAPSHOT_THREAD_POOL_SIZE,
+            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            SYSTEM_POOL,
+            new OomExceptionHandler(ctx));
+
+        assert cctx.pageStore() instanceof FilePageStoreManager;
+
+        FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+        pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
+
+        locSnpDir = resolveSnapshotWorkDirectory(ctx.config());
+        tmpWorkDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR, true);
+
+        U.ensureDirectory(locSnpDir, "snapshot work directory", log);
+        U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);
+
+        MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);
+
+        mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
+            "The system time of the last cluster snapshot request start time on this node.");
+        mreg.register("LastSnapshotEndTime", () -> lastSeenSnpFut.endTime,
+            "The system time of the last cluster snapshot request end time on this node.");
+        mreg.register("LastSnapshotName", () -> lastSeenSnpFut.name, String.class,
+            "The name of last started cluster snapshot request on this node.");
+        mreg.register("LastSnapshotErrorMessage",
+            () -> lastSeenSnpFut.error() == null ? "" : lastSeenSnpFut.error().getMessage(),
+            String.class,
+            "The error message of last started cluster snapshot request which fail with an error. " +
+                "This value will be empty if last snapshot request has been completed successfully.");
+        mreg.register("LocalSnapshotList", this::getSnapshots, List.class,
+            "The list of names of all snapshots currently saved on the local node with respect to " +
+                "the configured via IgniteConfiguration snapshot working path.");
+
+        storeFactory = storeMgr::getPageStoreFactory;
+
+        cctx.exchange().registerExchangeAwareComponent(this);
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+
+        // Receive remote snapshots requests.
+        cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+                if (!busyLock.enterBusy())
+                    return;
+
+                try {
+                    if (msg instanceof SnapshotRequestMessage) {
+                        SnapshotRequestMessage reqMsg0 = (SnapshotRequestMessage)msg;
+                        String snpName = reqMsg0.snapshotName();
+
+                        synchronized (this) {
+                            SnapshotFutureTask task = lastScheduledRemoteSnapshotTask(nodeId);
+
+                            if (task != null) {
+                                // Task will also be removed from local map due to the listener on future done.
+                                task.cancel();
+
+                                log.info("Snapshot request has been cancelled due to another request received " +
+                                    "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                            }
+                        }
+
+                        SnapshotFutureTask task = registerSnapshotTask(snpName,
+                            nodeId,
+                            reqMsg0.parts(),
+                            remoteSnapshotSender(snpName, nodeId));
+
+                        task.listen(f -> {
+                            if (f.error() == null)
+                                return;
+
+                            U.error(log, "Failed to process request of creating a snapshot " +
+                                "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                            try {
+                                cctx.gridIO().sendToCustomTopic(nodeId,
+                                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                    new SnapshotResponseMessage(reqMsg0.snapshotName(), f.error().getMessage()),
+                                    SYSTEM_POOL);
+                            }
+                            catch (IgniteCheckedException ex0) {
+                                U.error(log, "Fail to send the response message with processing snapshot request " +
+                                    "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                            }
+                        });
+
+                        task.start();
+                    }
+                    else if (msg instanceof SnapshotResponseMessage) {
+                        SnapshotResponseMessage respMsg0 = (SnapshotResponseMessage)msg;
+
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || !fut0.snpName.equals(respMsg0.snapshotName())) {
+                            if (log.isInfoEnabled()) {
+                                log.info("A stale snapshot response message has been received. Will be ignored " +
+                                    "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                            }
+
+                            return;
+                        }
+
+                        if (respMsg0.errorMessage() != null) {
+                            fut0.onDone(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                                "on the remote node with an error: " + respMsg0.errorMessage()));
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                    cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
+
+        cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                UUID leftNodeId = evt.eventNode().id();
+
+                if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
+                    SnapshotOperationRequest snpReq = clusterSnpReq;
+
+                    for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+                        if (sctx.sourceNodeId().equals(leftNodeId) ||
+                            (snpReq != null &&
+                                snpReq.snpName.equals(sctx.snapshotName()) &&
+                                snpReq.bltNodes.contains(leftNodeId))) {
+                            sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " +
+                                "One of baseline nodes left the cluster: " + leftNodeId));
+                        }
+                    }
+
+                    RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                    if (snpTrFut != null && snpTrFut.rmtNodeId.equals(leftNodeId)) {
+                        snpTrFut.onDone(new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                            "requested left the grid"));
+                    }
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        // Remote snapshot handler.
+        cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, new TransmissionHandler() {
+            @Override public void onEnd(UUID nodeId) {
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                assert snpTrFut.stores.isEmpty() : snpTrFut.stores.entrySet();
+                assert snpTrFut.partsLeft == 0 : snpTrFut;
+
+                snpTrFut.onDone();
+
+                log.info("Requested snapshot from remote node has been fully received " +
+                    "[snpName=" + snpTrFut.snpName + ", snpTrans=" + snpTrFut + ']');
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onException(UUID nodeId, Throwable err) {
+                RemoteSnapshotFuture fut = rmtSnpReq.get();
+
+                if (fut == null)
+                    return;
+
+                if (fut.rmtNodeId.equals(nodeId))
+                    fut.onDone(err);
+            }
+
+            /** {@inheritDoc} */
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
+                String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
+                String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
+
+                RemoteSnapshotFuture transFut = resolve(nodeId, fileMeta);
+
+                try {
+                    File cacheDir = U.resolveWorkDirectory(tmpWorkDir.getAbsolutePath(),
+                        Paths.get(transFut.snpName, rmtDbNodePath, cacheDirName).toString(),
+                        false);
+
+                    return new File(cacheDir, getPartitionFileName(partId)).getAbsolutePath();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            /**
+             * @param nodeId Remote node id.
+             * @param meta Transmission meta.
+             * @return Resolved transmission future.
+             */
+            private RemoteSnapshotFuture resolve(UUID nodeId, TransmissionMeta meta) {
+                String snpName = (String)meta.params().get(SNP_NAME_PARAM);
+                Integer partsCnt = (Integer)meta.params().get(SNP_PARTITIONS_CNT);
+
+                RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+                if (snpTrFut == null || !snpTrFut.snpName.equals(snpName)) {
+                    throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+                        "[snpName=" + snpName + ", meta=" + meta + ", snpTrFut=" + snpTrFut + ']');
+                }
+
+                assert snpTrFut.snpName.equals(snpName) && snpTrFut.rmtNodeId.equals(nodeId) :
+                    "Another transmission in progress [snpTrFut=" + snpTrFut + ", nodeId=" + snpName + ']';
+
+                if (snpTrFut.partsLeft == -1)
+                    snpTrFut.partsLeft = partsCnt;
+
+                return snpTrFut;
+            }
+
+            /**
+             * @param snpTrans Current snapshot transmission.
+             * @param grpPartId Pair of group id and its partition id.
+             */
+            private void finishRecover(RemoteSnapshotFuture snpTrans, GroupPartitionId grpPartId) {
+                FilePageStore pageStore = null;
+
+                try {
+                    pageStore = snpTrans.stores.remove(grpPartId);
+
+                    pageStore.finishRecover();
+
+                    snpTrans.partConsumer.accept(new File(pageStore.getFileAbsolutePath()), grpPartId);
+
+                    snpTrans.partsLeft--;
+                }
+                catch (StorageException e) {
+                    throw new IgniteException(e);
+                }
+                finally {
+                    U.closeQuiet(pageStore);
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+                Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+
+                RemoteSnapshotFuture snpTrFut = resolve(nodeId, initMeta);
+
+                GroupPartitionId grpPartId = new GroupPartitionId(grpId, partId);
+                FilePageStore pageStore = snpTrFut.stores.get(grpPartId);
+
+                if (pageStore == null) {
+                    throw new IgniteException("Partition must be loaded before applying snapshot delta pages " +
+                        "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                }
+
+                pageStore.beginRecover();
+
+                // No snapshot delta pages received. Finalize recovery.
+                if (initMeta.count() == 0)
+                    finishRecover(snpTrFut, grpPartId);
+
+                return new Consumer<ByteBuffer>() {
+                    final LongAdder transferred = new LongAdder();
+
+                    @Override public void accept(ByteBuffer buff) {
+                        try {
+                            assert initMeta.count() != 0 : initMeta;
+
+                            RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                            if (fut0 == null || !fut0.equals(snpTrFut) || fut0.isCancelled()) {
+                                throw new TransmissionCancelledException("Snapshot request is cancelled " +
+                                    "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                            }
+
+                            pageStore.write(PageIO.getPageId(buff), buff, 0, false);
+
+                            transferred.add(buff.capacity());
+
+                            if (transferred.longValue() == initMeta.count())
+                                finishRecover(snpTrFut, grpPartId);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                };
+            }
+
+            /** {@inheritDoc} */
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+                Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+                Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+                String snpName = (String)initMeta.params().get(SNP_NAME_PARAM);
+
+                assert grpId != null;
+                assert partId != null;
+                assert snpName != null;
+                assert storeFactory != null;
+
+                RemoteSnapshotFuture transFut = rmtSnpReq.get();
+
+                if (transFut == null) {
+                    throw new IgniteException("Snapshot transmission with given name doesn't exists " +
+                        "[snpName=" + snpName + ", grpId=" + grpId + ", partId=" + partId + ']');
+                }
+
+                return new Consumer<File>() {
+                    @Override public void accept(File file) {
+                        RemoteSnapshotFuture fut0 = rmtSnpReq.get();
+
+                        if (fut0 == null || !fut0.equals(transFut) || fut0.isCancelled()) {
+                            throw new TransmissionCancelledException("Snapshot request is cancelled [snpName=" + snpName +
+                                ", grpId=" + grpId + ", partId=" + partId + ']');
+                        }
+
+                        busyLock.enterBusy();
+
+                        try {
+                            FilePageStore pageStore = (FilePageStore)storeFactory
+                                .apply(grpId, false)
+                                .createPageStore(getFlagByPartId(partId),
+                                    file::toPath,
+                                    new LongAdderMetric("NO_OP", null));
+
+                            transFut.stores.put(new GroupPartitionId(grpId, partId), pageStore);
+
+                            pageStore.init();
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                        finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+                };
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        busyLock.block();
+
+        try {
+            // Try stop all snapshot processing if not yet.
+            for (SnapshotFutureTask sctx : locSnpTasks.values())
+                sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+            locSnpTasks.clear();
+
+            RemoteSnapshotFuture snpTrFut = rmtSnpReq.get();
+
+            if (snpTrFut != null)
+                snpTrFut.cancel();
+
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null) {
+                    clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+                    clusterSnpFut = null;
+                }
+            }
+
+            if (snpRunner != null)
+                snpRunner.shutdownNow();
+
+            cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+            cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
+            if (discoLsnr != null)
+                cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
+
+            cctx.exchange().unregisterExchangeAwareComponent(this);
+        }
+        finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * @param snpDir Snapshot dir.
+     * @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id).
+     */
+    public static void deleteSnapshot(File snpDir, String folderName) {
+        if (!snpDir.exists())
+            return;
+
+        assert snpDir.isDirectory() : snpDir;
+
+        try {
+            File binDir = resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName);
+            File dbDir = U.resolveWorkDirectory(snpDir.getAbsolutePath(), databaseRelativePath(folderName), false);
+
+            U.delete(binDir);
+            U.delete(dbDir);
+
+            File marshDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath());
+
+            // Concurrently traverse the snapshot marshaller directory and delete all files.
+            Files.walkFileTree(marshDir.toPath(), new SimpleFileVisitor<Path>() {
+                @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                    U.delete(file);
+
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override public FileVisitResult visitFileFailed(Path file, IOException exc) {
+                    // Skip files which can be concurrently removed from FileTree.
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+
+            File db = new File(snpDir, DB_DEFAULT_FOLDER);
+
+            if (!db.exists() || db.list().length == 0)
+                U.delete(snpDir);
+        }
+        catch (IOException | IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return Local snapshot directory for snapshot with given name.
+     */
+    public File snapshotLocalDir(String snpName) {
+        assert locSnpDir != null;
+
+        return new File(locSnpDir, snpName);

Review comment:
       From the API standpoint there is no indication that snapshot name will be used as a folder name. A proper path escaping and translation logic should be added, with the corresponding test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org