You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2020/05/01 08:59:27 UTC

[ignite] branch master updated: IGNITE-11073: Cluster snapshots of persisted caches (#7760)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad8944  IGNITE-11073: Cluster snapshots of persisted caches (#7760)
5ad8944 is described below

commit 5ad8944992e49788828e915a8d068f3706616f9a
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Fri May 1 11:58:58 2020 +0300

    IGNITE-11073: Cluster snapshots of persisted caches (#7760)
---
 ...IgnitePersistenceCompatibilityAbstractTest.java |    2 +-
 .../src/main/java/org/apache/ignite/Ignite.java    |    5 +
 .../java/org/apache/ignite/IgniteSnapshot.java     |   42 +
 .../ignite/configuration/IgniteConfiguration.java  |   31 +
 .../org/apache/ignite/internal/IgniteFeatures.java |    3 +
 .../org/apache/ignite/internal/IgniteKernal.java   |   11 +-
 .../ignite/internal/MarshallerContextImpl.java     |   97 +-
 .../internal/MarshallerMappingFileStore.java       |   29 +-
 .../internal/managers/IgniteMBeansManager.java     |    6 +
 .../managers/discovery/GridDiscoveryManager.java   |   34 +-
 .../ignite/internal/pagemem/store/PageStore.java   |   16 +-
 .../internal/pagemem/store/PageWriteListener.java  |   35 +
 .../processors/cache/ClusterCachesInfo.java        |   22 +-
 .../internal/processors/cache/ExchangeContext.java |    6 +-
 .../processors/cache/GridCacheProcessor.java       |    3 +
 .../processors/cache/GridCacheSharedContext.java   |   17 +
 .../cache/binary/BinaryMetadataFileStore.java      |   33 +-
 .../binary/CacheObjectBinaryProcessorImpl.java     |   55 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java |   32 +-
 .../cache/persistence/DbCheckpointListener.java    |    6 +
 .../GridCacheDatabaseSharedManager.java            |   26 +-
 .../cache/persistence/file/FilePageStore.java      |   57 +-
 .../persistence/file/FilePageStoreManager.java     |  269 +++--
 .../persistence/partstate/GroupPartitionId.java    |    2 +-
 .../snapshot/IgniteCacheSnapshotManager.java       |    4 +-
 .../snapshot/IgniteSnapshotManager.java            | 1233 ++++++++++++++++++++
 .../persistence/snapshot/SnapshotFutureTask.java   | 1010 ++++++++++++++++
 .../persistence/snapshot/SnapshotMXBeanImpl.java   |   41 +
 .../cache/persistence/snapshot/SnapshotSender.java |  234 ++++
 .../wal/reader/IgniteWalIteratorFactory.java       |    2 +-
 .../wal/reader/StandaloneGridKernalContext.java    |    5 +-
 .../cacheobject/IgniteCacheObjectProcessor.java    |    8 +-
 .../marshaller/GridMarshallerMappingProcessor.java |   15 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |   14 +-
 .../util/distributed/DistributedProcess.java       |   50 +-
 .../internal/util/distributed/InitMessage.java     |    2 +-
 .../org/apache/ignite/mxbean/SnapshotMXBean.java   |   35 +
 .../GridCacheRebalancingSyncSelfTest.java          |    2 +-
 .../IgnitePdsBinaryMetadataAsyncWritingTest.java   |    5 +-
 ...gnitePdsBinaryMetadataOnClusterRestartTest.java |    6 +-
 .../IgnitePdsNoSpaceLeftOnDeviceTest.java          |    5 +-
 .../db/wal/IgniteWalIteratorSwitchSegmentTest.java |    2 +
 .../pagemem/BPlusTreePageMemoryImplTest.java       |    1 +
 .../BPlusTreeReuseListPageMemoryImplTest.java      |    1 +
 .../pagemem/IndexStoragePageMemoryImplTest.java    |    1 +
 .../pagemem/PageMemoryImplNoLoadTest.java          |    1 +
 .../persistence/pagemem/PageMemoryImplTest.java    |    1 +
 .../snapshot/AbstractSnapshotSelfTest.java         |  513 ++++++++
 .../snapshot/IgniteClusterSnapshotSelfTest.java    |  945 +++++++++++++++
 .../snapshot/IgniteSnapshotMXBeanTest.java         |   80 ++
 .../snapshot/IgniteSnapshotManagerSelfTest.java    |  439 +++++++
 .../internal/processors/igfs/IgfsIgniteMock.java   |    8 +
 .../loadtests/hashmap/GridCacheTestContext.java    |    1 +
 .../ignite/platform/PlatformDeployServiceTask.java |   11 +-
 .../apache/ignite/testframework/GridTestUtils.java |   17 +-
 .../testframework/junits/GridAbstractTest.java     |    5 +-
 .../ignite/testframework/junits/IgniteMock.java    |    6 +
 .../junits/common/GridCommonAbstractTest.java      |    7 +-
 .../junits/multijvm/IgniteProcessProxy.java        |    6 +
 .../IgniteBasicWithPersistenceTestSuite.java       |    9 +-
 .../cache/metric/SqlViewExporterSpiTest.java       |    4 +-
 .../IgniteClusterSnapshotWithIndexesTest.java      |  274 +++++
 .../testsuites/IgnitePdsWithIndexingTestSuite.java |    4 +-
 .../java/org/apache/ignite/IgniteSpringBean.java   |    5 +
 64 files changed, 5613 insertions(+), 238 deletions(-)

diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java
index b08f6f7..3aa75a5 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/persistence/IgnitePersistenceCompatibilityAbstractTest.java
@@ -24,8 +24,8 @@ import org.apache.ignite.compatibility.testframework.junits.IgniteCompatibilityA
 import org.apache.ignite.compatibility.testframework.util.CompatibilityTestsUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
-import static org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext.BINARY_META_FOLDER;
 
 /**
  * Super class for all persistence compatibility tests.
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 14b5797..b1e3b54 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -733,4 +733,9 @@ public interface Ignite extends AutoCloseable {
      * @return Instance of {@link IgniteEncryption} interface.
      */
     public IgniteEncryption encryption();
+
+    /**
+     * @return Snapshot manager.
+     */
+    public IgniteSnapshot snapshot();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
new file mode 100644
index 0000000..753d427
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ * This interface provides functionality for creating cluster-wide cache data snapshots.
+ * <p>
+ * Current limitations:
+ * <ul>
+ * <li>Snapshot will trigger PME (partition map exchange) to run itself.</li>
+ * <li>Snapshot will be taken from all registered persistence caches to
+ * grantee data consistency between them.</li>
+ * <li>Snapshot must be resorted manually on the switched off cluster by copying data
+ * to the working directory on each cluster node.</li>
+ * </ul>
+ */
+public interface IgniteSnapshot {
+    /**
+     * Create a consistent copy of all persistence cache groups from the whole cluster.
+     *
+     * @param name Snapshot unique name which satisfies the following name pattern [a-zA-Z0-9_].
+     * @return Future which will be completed when a process ends.
+     */
+    public IgniteFuture<Void> createSnapshot(String name);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 2ba4e29..313d462 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -221,6 +221,9 @@ public class IgniteConfiguration {
     /** Default value for cache sanity check enabled flag. */
     public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
 
+    /** Default relative working directory path for snapshot operation result. */
+    public static final String DFLT_SNAPSHOT_DIRECTORY = "snapshots";
+
     /** Default value for late affinity assignment flag. */
     @Deprecated
     public static final boolean DFLT_LATE_AFF_ASSIGNMENT = true;
@@ -552,6 +555,13 @@ public class IgniteConfiguration {
     /** Page memory configuration. */
     private DataStorageConfiguration dsCfg;
 
+    /**
+     * Directory where will be stored all results of snapshot operations. The internal
+     * {@link U#resolveWorkDirectory(String, String, boolean)} is used to configure
+     * snapshot working directory.
+     */
+    private String snapshotPath = DFLT_SNAPSHOT_DIRECTORY;
+
     /** Active on start flag. */
     @Deprecated
     private boolean activeOnStart = DFLT_ACTIVE_ON_START;
@@ -711,6 +721,7 @@ public class IgniteConfiguration {
         segPlc = cfg.getSegmentationPolicy();
         segResolveAttempts = cfg.getSegmentationResolveAttempts();
         segResolvers = cfg.getSegmentationResolvers();
+        snapshotPath = cfg.getSnapshotPath();
         sndRetryCnt = cfg.getNetworkSendRetryCount();
         sndRetryDelay = cfg.getNetworkSendRetryDelay();
         sqlConnCfg = cfg.getSqlConnectorConfiguration();
@@ -3153,6 +3164,26 @@ public class IgniteConfiguration {
     }
 
     /**
+     * @return By default the relative {@link #DFLT_SNAPSHOT_DIRECTORY} is used. The value can be
+     * configured as relative path starting from the Ignites {@link #getWorkDirectory()} or
+     * the value can be represented as an absolute snapshot working path.
+     */
+    public String getSnapshotPath() {
+        return snapshotPath;
+    }
+
+    /**
+     * @param snapshotPath By default the relative {@link #DFLT_SNAPSHOT_DIRECTORY} is used.
+     * The value can be configured as relative path starting from the Ignites {@link #getWorkDirectory()}
+     * or the value can be represented as an absolute snapshot working path instead.
+     */
+    public IgniteConfiguration setSnapshotPath(String snapshotPath) {
+        this.snapshotPath = snapshotPath;
+
+        return this;
+    }
+
+    /**
      * Gets grid warmup closure. This closure will be executed before actual grid instance start. Configuration of
      * a starting instance will be passed to the closure so it can decide what operations to warm up.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
index a5f7b4f..0bba3f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java
@@ -95,6 +95,9 @@ public enum IgniteFeatures {
      */
     SAFE_CLUSTER_DEACTIVATION(22),
 
+    /** Persistence caches can be snapshot.  */
+    PERSISTENCE_CACHE_SNAPSHOT(23),
+
     /** Long operations dump timeout. */
     LONG_OPERATIONS_DUMP_TIMEOUT(30);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 3685d71..7138214 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -84,6 +84,7 @@ import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.Ignition;
@@ -246,7 +247,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
-import static org.apache.ignite.IgniteSystemProperties.snapshot;
 import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
 import static org.apache.ignite.internal.GridKernalState.STARTED;
 import static org.apache.ignite.internal.GridKernalState.STARTING;
@@ -1820,7 +1820,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             // Stick all system properties into node's attributes overwriting any
             // identical names from environment properties.
-            for (Map.Entry<Object, Object> e : snapshot().entrySet()) {
+            for (Map.Entry<Object, Object> e : IgniteSystemProperties.snapshot().entrySet()) {
                 String key = (String)e.getKey();
 
                 if (incProps == null || U.containsStringArray(incProps, key, true) ||
@@ -2829,7 +2829,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         assert log != null;
 
         if (log.isDebugEnabled() && S.includeSensitive())
-            for (Map.Entry<Object, Object> entry : snapshot().entrySet())
+            for (Map.Entry<Object, Object> entry : IgniteSystemProperties.snapshot().entrySet())
                 log.debug("System property [" + entry.getKey() + '=' + entry.getValue() + ']');
     }
 
@@ -4042,6 +4042,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteSnapshot snapshot() {
+        return ctx.cache().context().snapshotMgr();
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<MemoryMetrics> memoryMetrics() {
         return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index e355300..a4625ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -32,6 +32,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -168,27 +170,73 @@ public class MarshallerContextImpl implements MarshallerContext {
     }
 
     /**
-     * @param platformId Platform id.
-     * @param marshallerMappings All marshaller mappings for given platformId.
-     * @throws IgniteCheckedException In case of failure to process incoming marshaller mappings.
+     * @param log Ignite logger.
+     * @param mappings All marshaller mappings to write.
      */
-    public void onMappingDataReceived(byte platformId, Map<Integer, MappedName> marshallerMappings)
-        throws IgniteCheckedException
-    {
-        ConcurrentMap<Integer, MappedName> platformCache = getCacheFor(platformId);
+    public void onMappingDataReceived(IgniteLogger log, List<Map<Integer, MappedName>> mappings) {
+        addPlatformMappings(log,
+            mappings,
+            this::getCacheFor,
+            (mappedName, clsName) ->
+                mappedName == null || F.isEmpty(clsName) || !clsName.equals(mappedName.className()),
+            fileStore);
+    }
 
-        for (Map.Entry<Integer, MappedName> e : marshallerMappings.entrySet()) {
-            int typeId = e.getKey();
-            String clsName = e.getValue().className();
+    /**
+     * @param ctx Kernal context.
+     * @param mappings Marshaller mappings to save.
+     * @param dir Directory to save given mappings to.
+     */
+    public static void saveMappings(GridKernalContext ctx, List<Map<Integer, MappedName>> mappings, File dir) {
+        MarshallerMappingFileStore writer = new MarshallerMappingFileStore(ctx,
+            mappingFileStoreWorkDir(dir.getAbsolutePath()));
+
+        addPlatformMappings(ctx.log(MarshallerContextImpl.class),
+            mappings,
+            b -> new ConcurrentHashMap<>(),
+            (mappedName, clsName) -> true,
+            writer);
+    }
 
-            MappedName mappedName = platformCache.get(typeId);
+    /**
+     * @param mappings Map of marshaller mappings.
+     * @param mappedCache Cache to attach new mappings to.
+     * @param cacheAddPred Check mapping can be added.
+     * @param writer Persistence mapping writer.
+     */
+    private static void addPlatformMappings(
+        IgniteLogger log,
+        List<Map<Integer, MappedName>> mappings,
+        Function<Byte, ConcurrentMap<Integer, MappedName>> mappedCache,
+        BiPredicate<MappedName, String> cacheAddPred,
+        MarshallerMappingFileStore writer
+    ) {
+        if (mappings == null)
+            return;
 
-            if (mappedName != null && !F.isEmpty(clsName) && clsName.equals(mappedName.className()))
+        for (byte platformId = 0; platformId < mappings.size(); platformId++) {
+            Map<Integer, MappedName> attach = mappings.get(platformId);
+
+            if (attach == null)
                 continue;
 
-            platformCache.put(typeId, new MappedName(clsName, true));
+            ConcurrentMap<Integer, MappedName> cached = mappedCache.apply(platformId);
+
+            for (Map.Entry<Integer, MappedName> e : attach.entrySet()) {
+                Integer typeId = e.getKey();
+                String clsName = e.getValue().className();
+
+                if (cacheAddPred.test(cached.get(typeId), clsName)) {
+                    try {
+                        cached.put(typeId, new MappedName(clsName, true));
 
-            fileStore.mergeAndWriteMapping(platformId, typeId, clsName);
+                        writer.mergeAndWriteMapping(platformId, typeId, clsName);
+                    }
+                    catch (IgniteCheckedException ex) {
+                        U.error(log, "Failed to write marshaller mapping data", ex);
+                    }
+                }
+            }
         }
     }
 
@@ -198,7 +246,7 @@ public class MarshallerContextImpl implements MarshallerContext {
      * @param fileName File name.
      */
     public void checkHasClassName(String clsName, ClassLoader ldr, String fileName) {
-        ConcurrentMap cache = getCacheFor(JAVA_ID);
+        ConcurrentMap<Integer, MappedName> cache = getCacheFor(JAVA_ID);
 
         if (!cache.containsKey(clsName.hashCode()))
             throw new IgniteException("Failed to read class name from class names properties file. " +
@@ -502,10 +550,10 @@ public class MarshallerContextImpl implements MarshallerContext {
         IgniteConfiguration cfg = ctx.config();
         String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome());
 
-        final IgniteLogger fileStoreLog = ctx.log(MarshallerMappingFileStore.class);
         fileStore = marshallerMappingFileStoreDir == null ?
-            new MarshallerMappingFileStore(workDir, fileStoreLog) :
-            new MarshallerMappingFileStore(fileStoreLog, marshallerMappingFileStoreDir);
+            new MarshallerMappingFileStore(ctx, mappingFileStoreWorkDir(workDir)) :
+            new MarshallerMappingFileStore(ctx, marshallerMappingFileStoreDir);
+
         this.transport = transport;
         closProc = ctx.closure();
         clientNode = ctx.clientNode();
@@ -515,6 +563,19 @@ public class MarshallerContextImpl implements MarshallerContext {
     }
 
     /**
+     * @param igniteWorkDir Base ignite working directory.
+     * @return Resolved directory.
+     */
+    public static File mappingFileStoreWorkDir(String igniteWorkDir) {
+        try {
+            return U.resolveWorkDirectory(igniteWorkDir, "marshaller", false);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
      *
      */
     public void onMarshallerProcessorStop() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
index 8709d77..7945352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerMappingFileStore.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.internal;
 
 import java.io.BufferedReader;
@@ -45,6 +46,9 @@ import org.apache.ignite.marshaller.MarshallerContext;
  * when a classname is requested but is not presented in local cache of {@link MarshallerContextImpl}.
  */
 final class MarshallerMappingFileStore {
+    /** */
+    private static final String FILE_EXTENSION = ".classname";
+
     /** File lock timeout in milliseconds. */
     private static final int FILE_LOCK_TIMEOUT_MS = 5000;
 
@@ -57,26 +61,15 @@ final class MarshallerMappingFileStore {
     /** Marshaller mapping directory */
     private final File workDir;
 
-    /** */
-    private final String FILE_EXTENSION = ".classname";
-
     /**
-     * @param igniteWorkDir Ignite work directory
-     * @param log Logger.
-     */
-    MarshallerMappingFileStore(String igniteWorkDir, IgniteLogger log) throws IgniteCheckedException {
-        workDir = U.resolveWorkDirectory(igniteWorkDir, "marshaller", false);
-        this.log = log;
-    }
-
-    /**
-     * Creates marshaller mapping file store with custom predefined work directory
-     * @param log logger.
-     * @param marshallerMappingFileStoreDir custom marshaller work directory
+     * Creates marshaller mapping file store with custom predefined work directory.
+     *
+     * @param workDir custom marshaller work directory.
+     * @param kctx Grid kernal context.
      */
-    MarshallerMappingFileStore(final IgniteLogger log, final File marshallerMappingFileStoreDir) {
-        this.workDir = marshallerMappingFileStoreDir;
-        this.log = log;
+    MarshallerMappingFileStore(GridKernalContext kctx, final File workDir) {
+        this.workDir = workDir;
+        log = kctx.log(MarshallerMappingFileStore.class);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
index d31f3ba..03750d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.TransactionMetricsMxBeanImpl;
 import org.apache.ignite.internal.TransactionsMXBeanImpl;
 import org.apache.ignite.internal.managers.encryption.EncryptionMXBeanImpl;
 import org.apache.ignite.internal.processors.cache.persistence.DataStorageMXBeanImpl;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMXBeanImpl;
 import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustMXBeanImpl;
 import org.apache.ignite.internal.processors.metric.MetricsMxBeanImpl;
 import org.apache.ignite.internal.util.StripedExecutor;
@@ -55,6 +56,7 @@ import org.apache.ignite.mxbean.IgniteMXBean;
 import org.apache.ignite.mxbean.MetricsMxBean;
 import org.apache.ignite.mxbean.ServiceMXBean;
 import org.apache.ignite.mxbean.QueryMXBean;
+import org.apache.ignite.mxbean.SnapshotMXBean;
 import org.apache.ignite.mxbean.StripedExecutorMXBean;
 import org.apache.ignite.mxbean.ThreadPoolMXBean;
 import org.apache.ignite.mxbean.TransactionMetricsMxBean;
@@ -179,6 +181,10 @@ public class IgniteMBeansManager {
         registerMBean("Encryption", encryptionMXBean.getClass().getSimpleName(), encryptionMXBean,
             EncryptionMXBean.class);
 
+        // Snapshot.
+        SnapshotMXBean snpMXBean = new SnapshotMXBeanImpl(ctx);
+        registerMBean("Snapshot", snpMXBean.getClass().getSimpleName(), snpMXBean, SnapshotMXBean.class);
+
         // Metrics configuration
         MetricsMxBean metricsMxBean = new MetricsMxBeanImpl(ctx.metric(), log);
         registerMBean("Metrics", metricsMxBean.getClass().getSimpleName(), metricsMxBean, MetricsMxBean.class);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 23b3f19..828f18e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -660,23 +660,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState());
                 }
 
-                if (type == EVT_DISCOVERY_CUSTOM_EVT) {
-                    for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
-                        List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
-
-                        if (list != null) {
-                            for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
-                                try {
-                                    lsnr.onCustomEvent(nextTopVer, node, customMsg);
-                                }
-                                catch (Exception e) {
-                                    U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
-                                }
-                            }
-                        }
-                    }
-                }
-
                 DiscoCache discoCache;
 
                 // Put topology snapshot into discovery history.
@@ -739,6 +722,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
+                if (type == EVT_DISCOVERY_CUSTOM_EVT) {
+                    for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
+                        List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls);
+
+                        if (list != null) {
+                            for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
+                                try {
+                                    lsnr.onCustomEvent(nextTopVer, node, customMsg);
+                                }
+                                catch (Exception e) {
+                                    U.error(log, "Failed to notify direct custom event listener: " + customMsg, e);
+                                }
+                            }
+                        }
+                    }
+                }
+
                 // If this is a local join event, just save it and do not notify listeners.
                 if (locJoinEvt) {
                     if (gridStartTime == 0)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
index 7c1e15d..1d9e501 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagemem.store;
 
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
@@ -24,7 +25,17 @@ import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 /**
  * Persistent store of pages.
  */
-public interface PageStore {
+public interface PageStore extends Closeable {
+    /**
+     * @param lsnr Page write listener to set.
+     */
+    public void addWriteListener(PageWriteListener lsnr);
+
+    /**
+     * @param lsnr Page write listener to remove.
+     */
+    public void removeWriteListener(PageWriteListener lsnr);
+
     /**
      * Checks if page exists.
      *
@@ -53,9 +64,10 @@ public interface PageStore {
      * @param pageId Page ID.
      * @param pageBuf Page buffer to read into.
      * @param keepCrc by default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc
+     * @return {@code true} if page has been read successfully, {@code false} if page hasn't been written yet.
      * @throws IgniteCheckedException If reading failed (IO error occurred).
      */
-    public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException;
+    public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException;
 
     /**
      * Reads a header.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageWriteListener.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageWriteListener.java
new file mode 100644
index 0000000..2f1b507
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageWriteListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.store;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+
+/**
+ * Each page write attempt to a {@link FilePageStore} may be covered by such listener.
+ * If it is necessary, a page data can be handled by another process prior to actually
+ * written to the PageStore.
+ */
+@FunctionalInterface
+public interface PageWriteListener {
+    /**
+     * @param pageId Handled page id.
+     * @param buf Buffer with data.
+     */
+    public void accept(long pageId, ByteBuffer buf);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 510cb6e..0062eb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -84,6 +84,7 @@ import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CACHE_PROC;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
 
 /**
  * Logic related to cache discovery data processing.
@@ -762,7 +763,8 @@ public class ClusterCachesInfo {
                     return;
                 }
 
-                processStopCacheRequest(exchangeActions, req, cacheName, desc);
+                if (!processStopCacheRequest(exchangeActions, req, res, cacheName, desc))
+                    return;
 
                 needExchange = true;
             }
@@ -783,13 +785,27 @@ public class ClusterCachesInfo {
      * @param exchangeActions Exchange actions to update.
      * @param cacheName Cache name.
      * @param desc Dynamic cache descriptor.
+     * @return {@code true} if stop request can be proceed.
      */
-    private void processStopCacheRequest(
+    private boolean processStopCacheRequest(
         ExchangeActions exchangeActions,
         DynamicCacheChangeRequest req,
+        CacheChangeProcessResult res,
         String cacheName,
         DynamicCacheDescriptor desc
     ) {
+        if (ctx.cache().context().snapshotMgr().isSnapshotCreating()) {
+            IgniteCheckedException err = new IgniteCheckedException(SNP_IN_PROGRESS_ERR_MSG);
+
+            U.warn(log, err);
+
+            res.errs.add(err);
+
+            ctx.cache().completeCacheStartFuture(req, false, err);
+
+            return false;
+        }
+
         DynamicCacheDescriptor old = registeredCaches.get(cacheName);
 
         assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
@@ -834,6 +850,8 @@ public class ClusterCachesInfo {
                 }
             }
         }
+
+        return true;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index f9e9376..dab45e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -31,6 +31,7 @@ import static org.apache.ignite.internal.IgniteFeatures.PME_FREE_SWITCH;
 import static org.apache.ignite.internal.IgniteFeatures.allNodesSupports;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.exchangeProtocolVersion;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
 
 /**
  *
@@ -76,9 +77,10 @@ public class ExchangeContext {
             log.warning("Current topology does not support the PME-free switch. Please check all nodes support" +
                 " this feature and it was not explicitly disabled by IGNITE_PME_FREE_SWITCH_DISABLED JVM option.");
 
+        boolean pmeFreeAvailable = (fut.wasRebalanced() && fut.isBaselineNodeFailed()) || isSnapshotOperation(fut.firstEvent());
+
         if (!compatibilityNode &&
-            fut.wasRebalanced() &&
-            fut.isBaselineNodeFailed() &&
+            pmeFreeAvailable &&
             allNodesSupportsPmeFreeSwitch) {
             exchangeFreeSwitch = true;
             merge = false;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index dea3aca..326f24d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
@@ -2983,6 +2984,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         WalStateManager walStateMgr = new WalStateManager(ctx);
 
+        IgniteSnapshotManager snapshotMgr = new IgniteSnapshotManager(ctx);
         IgniteCacheSnapshotManager snpMgr = ctx.plugins().createComponent(IgniteCacheSnapshotManager.class);
 
         if (snpMgr == null)
@@ -3010,6 +3012,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             walMgr,
             walStateMgr,
             dbMgr,
+            snapshotMgr,
             snpMgr,
             depMgr,
             exchMgr,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index e30f57e..ab496be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
 import org.apache.ignite.cluster.ClusterNode;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -124,6 +126,9 @@ public class GridCacheSharedContext<K, V> {
     /** Page store manager. {@code Null} if persistence is not enabled. */
     @Nullable private IgnitePageStoreManager pageStoreMgr;
 
+    /** Snapshot manager for persistence caches. See {@link IgniteSnapshot}. */
+    private IgniteSnapshotManager snapshotMgr;
+
     /** Affinity manager. */
     private CacheAffinitySharedManager affMgr;
 
@@ -217,6 +222,7 @@ public class GridCacheSharedContext<K, V> {
         @Nullable IgniteWriteAheadLogManager walMgr,
         WalStateManager walStateMgr,
         IgniteCacheDatabaseSharedManager dbMgr,
+        IgniteSnapshotManager snapshotMgr,
         IgniteCacheSnapshotManager snpMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
@@ -242,6 +248,7 @@ public class GridCacheSharedContext<K, V> {
             walMgr,
             walStateMgr,
             dbMgr,
+            snapshotMgr,
             snpMgr,
             depMgr,
             exchMgr,
@@ -421,6 +428,7 @@ public class GridCacheSharedContext<K, V> {
             walMgr,
             walStateMgr,
             dbMgr,
+            snapshotMgr,
             snpMgr,
             new GridCacheDeploymentManager<K, V>(),
             new GridCachePartitionExchangeManager<K, V>(),
@@ -470,6 +478,7 @@ public class GridCacheSharedContext<K, V> {
         IgniteWriteAheadLogManager walMgr,
         WalStateManager walStateMgr,
         IgniteCacheDatabaseSharedManager dbMgr,
+        IgniteSnapshotManager snapshotMgr,
         IgniteCacheSnapshotManager snpMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
@@ -489,6 +498,7 @@ public class GridCacheSharedContext<K, V> {
         this.walMgr = add(mgrs, walMgr);
         this.walStateMgr = add(mgrs, walStateMgr);
         this.dbMgr = add(mgrs, dbMgr);
+        this.snapshotMgr = add(mgrs, snapshotMgr);
         this.snpMgr = add(mgrs, snpMgr);
         this.jtaMgr = add(mgrs, jtaMgr);
         this.depMgr = add(mgrs, depMgr);
@@ -740,6 +750,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Page storage snapshot manager.
+     */
+    public IgniteSnapshotManager snapshotMgr() {
+        return snapshotMgr;
+    }
+
+    /**
      * @return Write ahead log manager.
      */
     public IgniteWriteAheadLogManager wal() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
index 924cf14..a3ffa62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
@@ -40,7 +41,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Class handles saving/restoring binary metadata to/from disk.
@@ -74,41 +74,34 @@ class BinaryMetadataFileStore {
      * @param metadataLocCache Metadata locale cache.
      * @param ctx Context.
      * @param log Logger.
-     * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta
-     * and consistentId
      */
     BinaryMetadataFileStore(
         final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache,
         final GridKernalContext ctx,
         final IgniteLogger log,
-        @Nullable final File binaryMetadataFileStoreDir
-    ) throws IgniteCheckedException {
+        final File workDir
+    ) {
         this.metadataLocCache = metadataLocCache;
         this.ctx = ctx;
         this.isPersistenceEnabled = CU.isPersistenceEnabled(ctx.config());
         this.log = log;
+        this.workDir = workDir;
 
-        if (!CU.isPersistenceEnabled(ctx.config()))
+        if (!CU.isPersistenceEnabled(ctx.config())) {
             return;
+        }
 
         fileIOFactory = ctx.config().getDataStorageConfiguration().getFileIOFactory();
+    }
 
-        if (binaryMetadataFileStoreDir != null)
-            workDir = binaryMetadataFileStoreDir;
-        else {
-            final String subFolder = ctx.pdsFolderResolver().resolveFolders().folderName();
-
-            workDir = new File(U.resolveWorkDirectory(
-                ctx.config().getWorkDirectory(),
-                "binary_meta",
-                false
-            ),
-                subFolder);
-        }
-
+    /**
+     * Starts worker thread for async writing of binary metadata.
+     */
+    void start() throws IgniteCheckedException {
         U.ensureDirectory(workDir, "directory for serialized binary metadata", log);
 
         writer = new BinaryMetadataAsyncWriter();
+
         new IgniteThread(writer).start();
     }
 
@@ -145,7 +138,7 @@ class BinaryMetadataFileStore {
 
             U.error(log, msg);
 
-            writer.cancel();
+            U.cancel(writer);
 
             ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 46632b7..f114bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -125,6 +125,9 @@ import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
  * Binary processor implementation.
  */
 public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter implements IgniteCacheObjectProcessor {
+    /** Binary metadata file store folder. */
+    public static final String BINARY_META_FOLDER = "binary_meta";
+
     /** Immutable classes. */
     private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>();
 
@@ -201,11 +204,42 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
         marsh = ctx.grid().configuration().getMarshaller();
     }
 
+    /**
+     * @param igniteWorkDir Basic ignite working directory.
+     * @param consId Node consistent id.
+     * @return Working directory.
+     */
+    public static File resolveBinaryWorkDir(String igniteWorkDir, String consId) {
+        try {
+            File workDir = new File(U.resolveWorkDirectory(
+                igniteWorkDir,
+                BINARY_META_FOLDER,
+                false),
+                consId);
+
+            U.ensureDirectory(workDir, "directory for serialized binary metadata", null);
+
+            return workDir;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         if (marsh instanceof BinaryMarshaller) {
-            if (!ctx.clientNode())
-                metadataFileStore = new BinaryMetadataFileStore(metadataLocCache, ctx, log, binaryMetadataFileStoreDir);
+            if (!ctx.clientNode()) {
+                metadataFileStore = new BinaryMetadataFileStore(metadataLocCache,
+                    ctx,
+                    log,
+                    binaryMetadataFileStoreDir == null ?
+                        resolveBinaryWorkDir(ctx.config().getWorkDirectory(),
+                            ctx.pdsFolderResolver().resolveFolders().folderName()) :
+                        binaryMetadataFileStoreDir);
+
+                metadataFileStore.start();
+            }
 
             transport = new BinaryMetadataTransport(metadataLocCache, metadataFileStore, ctx, log);
 
@@ -886,6 +920,23 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
+    @Override public void saveMetadata(Collection<BinaryType> types, File dir) {
+        try {
+            BinaryMetadataFileStore writer = new BinaryMetadataFileStore(new ConcurrentHashMap<>(),
+                ctx,
+                log,
+                resolveBinaryWorkDir(dir.getAbsolutePath(),
+                    ctx.pdsFolderResolver().resolveFolders().folderName()));
+
+            for (BinaryType type : types)
+                writer.mergeAndWriteMetadata(((BinaryTypeImpl)type).metadata());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public BinaryObject buildEnum(String typeName, int ord) throws BinaryObjectException {
         A.notNullOrEmpty(typeName, "enum type name");
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 926abdd..e907c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -140,6 +140,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
 import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent;
 import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
 import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
 import static org.apache.ignite.internal.util.IgniteUtils.doInParallelUninterruptibly;
 
@@ -799,7 +800,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             ExchangeType exchange;
 
             if (exchCtx.exchangeFreeSwitch()) {
-                exchange = onExchangeFreeSwitch();
+                exchange = isSnapshotOperation(firstDiscoEvt) ? onCustomMessageNoAffinityChange() :
+                    onExchangeFreeSwitchNodeLeft();
 
                 initCoordinatorCaches(newCrd);
             }
@@ -937,6 +939,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             for (PartitionsExchangeAware comp : cctx.exchange().exchangeAwareComponents())
                 comp.onInitAfterTopologyLock(this);
 
+            // For pme-free exchanges onInitAfterTopologyLock must be
+            // invoked prior to onDoneBeforeTopologyUnlock.
+            if (exchange == ExchangeType.ALL && context().exchangeFreeSwitch()) {
+                cctx.exchange().exchangerBlockingSectionBegin();
+
+                try {
+                    onDone(initialVersion());
+                }
+                finally {
+                    cctx.exchange().exchangerBlockingSectionEnd();
+                }
+            }
+
             if (exchLog.isInfoEnabled())
                 exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
         }
@@ -1404,7 +1419,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * @return Exchange type.
      */
-    private ExchangeType onExchangeFreeSwitch() {
+    private ExchangeType onExchangeFreeSwitchNodeLeft() {
         assert !firstDiscoEvt.eventNode().isClient() : this;
 
         assert firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED;
@@ -1511,7 +1526,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         boolean skipWaitOnLocalJoin = localJoinExchange()
             && cctx.exchange().latch().canSkipJoiningNodes(initialVersion());
 
-        if (context().exchangeFreeSwitch())
+        if (context().exchangeFreeSwitch() && isBaselineNodeFailed())
             waitPartitionRelease(true, false);
         else if (!skipWaitOnLocalJoin) { // Skip partition release if node has locally joined (it doesn't have any updates to be finished).
             boolean distributed = true;
@@ -1608,9 +1623,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         cctx.exchange().exchangerBlockingSectionBegin();
 
         try {
-            if (context().exchangeFreeSwitch())
-                onDone(initialVersion());
-            else {
+            if (!context().exchangeFreeSwitch()) {
                 if (crd.isLocal()) {
                     if (remaining.isEmpty()) {
                         initFut.onDone(true);
@@ -1719,7 +1732,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (distributed)
                 releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion());
 
-            partReleaseFut = context().exchangeFreeSwitch() ?
+            partReleaseFut = context().exchangeFreeSwitch() && isBaselineNodeFailed() ?
                 cctx.partitionRecoveryFuture(initialVersion(), firstDiscoEvt.eventNode()) :
                 cctx.partitionReleaseFuture(initialVersion());
 
@@ -2950,7 +2963,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 if (finishState0 == null) {
-                    assert firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient() : this;
+                    assert (firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient())
+                        || isSnapshotOperation(firstDiscoEvt) : GridDhtPartitionsExchangeFuture.this;
 
                     ClusterNode node = cctx.node(nodeId);
 
@@ -3957,6 +3971,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     Set<Integer> parts;
 
                     if (exchCtx.exchangeFreeSwitch()) {
+                        assert !isSnapshotOperation(firstDiscoEvt) : "Not allowed for taking snapshots: " + this;
+
                         // Previous topology to resolve failed primaries set.
                         AffinityTopologyVersion topVer = sharedContext().exchange().readyAffinityVersion();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
index 7c6938e..433ef27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence;
 
 import java.util.concurrent.Executor;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,6 +37,11 @@ public interface DbCheckpointListener {
         public boolean nextSnapshot();
 
         /**
+         * @return Checkpoint future which will be completed when checkpoint ends.
+         */
+        public IgniteInternalFuture<?> finishedStateFut();
+
+        /**
          * @return Partition allocation statistic map
          */
         public PartitionAllocationMap partitionStatMap();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 068fd5a..9d9433b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -68,7 +68,6 @@ import java.util.function.ToLongFunction;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-
 import org.apache.ignite.DataRegionMetricsProvider;
 import org.apache.ignite.DataStorageMetrics;
 import org.apache.ignite.IgniteCheckedException;
@@ -159,8 +158,8 @@ import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.GridCountDownCallback;
 import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
+import org.apache.ignite.internal.util.GridCountDownCallback;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.StripedExecutor;
@@ -1603,6 +1602,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     ) {
         Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
 
+        cctx.snapshotMgr().onCacheGroupsStopped(stoppedGrps.stream()
+            .filter(IgniteBiTuple::get2)
+            .map(t -> t.get1().groupId())
+            .collect(Collectors.toList()));
+
         for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
             CacheGroupContext gctx = tup.get1();
 
@@ -3945,6 +3949,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             CheckpointProgressImpl curr = scheduledCp;
 
+            List<DbCheckpointListener> dbLsnrs = new ArrayList<>(lsnrs);
+
             CheckpointRecord cpRec = new CheckpointRecord(memoryRecoveryRecordPtr);
 
             memoryRecoveryRecordPtr = null;
@@ -3964,7 +3970,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             internalReadLock();
 
             try {
-                for (DbCheckpointListener lsnr : lsnrs)
+                for (DbCheckpointListener lsnr : dbLsnrs)
                     lsnr.beforeCheckpointBegin(ctx0);
 
                 ctx0.awaitPendingTasksFinished();
@@ -3985,7 +3991,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 tracker.onMarkStart();
 
                 // Listeners must be invoked before we write checkpoint record to WAL.
-                for (DbCheckpointListener lsnr : lsnrs)
+                for (DbCheckpointListener lsnr : dbLsnrs)
                     lsnr.onMarkCheckpointBegin(ctx0);
 
                 ctx0.awaitPendingTasksFinished();
@@ -4038,7 +4044,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             curr.transitTo(LOCK_RELEASED);
 
-            for (DbCheckpointListener lsnr : lsnrs)
+            for (DbCheckpointListener lsnr : dbLsnrs)
                 lsnr.onCheckpointBegin(ctx);
 
             if (snapFut != null) {
@@ -4274,6 +4280,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 }
 
                 /** {@inheritDoc} */
+                @Override public IgniteInternalFuture<?> finishedStateFut() {
+                    return delegate.finishedStateFut();
+                }
+
+                /** {@inheritDoc} */
                 @Override public PartitionAllocationMap partitionStatMap() {
                     return delegate.partitionStatMap();
                 }
@@ -4383,6 +4394,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
 
             /** {@inheritDoc} */
+            @Override public IgniteInternalFuture<?> finishedStateFut() {
+                return curr.futureFor(FINISHED);
+            }
+
+            /** {@inheritDoc} */
             @Override public PartitionAllocationMap partitionStatMap() {
                 return map;
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
index d9749f4..142d0a1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
@@ -25,6 +25,8 @@ import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,6 +36,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.store.PageWriteListener;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
@@ -87,6 +90,9 @@ public class FilePageStore implements PageStore {
     /** Region metrics updater. */
     private final LongAdderMetric allocatedTracker;
 
+    /** List of listeners for current page store to handle. */
+    private final List<PageWriteListener> lsnrs = new CopyOnWriteArrayList<>();
+
     /** */
     protected final int pageSize;
 
@@ -123,6 +129,16 @@ public class FilePageStore implements PageStore {
     }
 
     /** {@inheritDoc} */
+    @Override public void addWriteListener(PageWriteListener lsnr) {
+        lsnrs.add(lsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeWriteListener(PageWriteListener lsnr) {
+        lsnrs.remove(lsnr);
+    }
+
+    /** {@inheritDoc} */
     @Override public int getPageSize() {
         return pageSize;
     }
@@ -297,8 +313,11 @@ public class FilePageStore implements PageStore {
         return fileSize;
     }
 
-    /** {@inheritDoc} */
-    @Override public void stop(boolean delete) throws StorageException {
+    /**
+     * @param delete {@code True} to delete file.
+     * @throws IOException If fails.
+     */
+    private void stop0(boolean delete) throws IOException {
         lock.writeLock().lock();
 
         try {
@@ -324,10 +343,6 @@ public class FilePageStore implements PageStore {
                 fileExists = false;
             }
         }
-        catch (IOException e) {
-            throw new StorageException("Failed to stop serving partition file [file=" + getFileAbsolutePath()
-                + ", delete=" + delete + "]", e);
-        }
         finally {
             allocatedTracker.add(-1L * allocated.getAndSet(0) / pageSize);
 
@@ -338,6 +353,22 @@ public class FilePageStore implements PageStore {
     }
 
     /** {@inheritDoc} */
+    @Override public void stop(boolean delete) throws StorageException {
+        try {
+            stop0(delete);
+        }
+        catch (IOException e) {
+            throw new StorageException("Failed to stop serving partition file [file=" + getFileAbsolutePath()
+                + ", delete=" + delete + "]", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IOException {
+        stop0(false);
+    }
+
+    /** {@inheritDoc} */
     @Override public void truncate(int tag) throws StorageException {
         init();
 
@@ -433,7 +464,7 @@ public class FilePageStore implements PageStore {
     }
 
     /** {@inheritDoc} */
-    @Override public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
+    @Override public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
         init();
 
         try {
@@ -453,7 +484,7 @@ public class FilePageStore implements PageStore {
             if (n < 0) {
                 pageBuf.put(new byte[pageBuf.remaining()]);
 
-                return;
+                return false;
             }
 
             int savedCrc32 = PageIO.getCrc(pageBuf);
@@ -478,6 +509,8 @@ public class FilePageStore implements PageStore {
 
             if (keepCrc)
                 PageIO.setCrc(pageBuf, savedCrc32);
+
+            return true;
         }
         catch (IOException e) {
             throw new StorageException("Failed to read page [file=" + getFileAbsolutePath() + ", pageId=" + pageId + "]", e);
@@ -501,7 +534,7 @@ public class FilePageStore implements PageStore {
     /**
      * @throws StorageException If failed to initialize store file.
      */
-    private void init() throws StorageException {
+    public void init() throws StorageException {
         if (!inited) {
             lock.writeLock().lock();
 
@@ -675,6 +708,12 @@ public class FilePageStore implements PageStore {
 
                     assert pageBuf.position() == 0 : pageBuf.position();
 
+                    for (PageWriteListener lsnr : lsnrs) {
+                        lsnr.accept(pageId, pageBuf);
+
+                        pageBuf.rewind();
+                    }
+
                     fileIO.writeFully(pageBuf, off);
 
                     PageIO.setCrc(pageBuf, 0);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index a6712db..2ee7e4d 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -37,12 +37,15 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -89,9 +92,12 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.lang.String.format;
 import static java.nio.file.Files.delete;
 import static java.nio.file.Files.newDirectoryStream;
 import static java.util.Objects.requireNonNull;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
 
 /**
  * File page store manager.
@@ -137,6 +143,12 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     public static final PathMatcher TMP_FILE_MATCHER =
         FileSystems.getDefault().getPathMatcher("glob:**" + TMP_SUFFIX);
 
+    /** Listeners of configuration changes e.g. overwrite or remove actions. */
+    private final List<BiConsumer<String, File>> lsnrs = new CopyOnWriteArrayList<>();
+
+    /** Lock which guards configuration changes. */
+    private final ReentrantReadWriteLock chgLock = new ReentrantReadWriteLock();
+
     /** Marshaller. */
     private final Marshaller marshaller;
 
@@ -430,20 +442,18 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
 
     /** {@inheritDoc} */
     @Override public void storeCacheData(StoredCacheData cacheData, boolean overwrite) throws IgniteCheckedException {
-        File cacheWorkDir = cacheWorkDir(cacheData.config());
-        File file;
+        CacheConfiguration<?, ?> ccfg = cacheData.config();
+        File cacheWorkDir = cacheWorkDir(ccfg);
 
         checkAndInitCacheWorkDir(cacheWorkDir);
 
         assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir;
 
-        if (cacheData.config().getGroupName() != null)
-            file = new File(cacheWorkDir, cacheData.config().getName() + CACHE_DATA_FILENAME);
-        else
-            file = new File(cacheWorkDir, CACHE_DATA_FILENAME);
-
+        File file = cacheConfigurationFile(ccfg);
         Path filePath = file.toPath();
 
+        chgLock.readLock().lock();
+
         try {
             if (overwrite || !Files.exists(filePath) || Files.size(filePath) == 0) {
                 File tmp = new File(file.getParent(), file.getName() + TMP_SUFFIX);
@@ -458,16 +468,40 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
                     marshaller.marshal(cacheData, stream);
                 }
 
+                if (Files.exists(filePath) && Files.size(filePath) > 0) {
+                    for (BiConsumer<String, File> lsnr : lsnrs)
+                        lsnr.accept(ccfg.getName(), file);
+                }
+
                 Files.move(tmp.toPath(), file.toPath(), StandardCopyOption.REPLACE_EXISTING);
             }
         }
         catch (IOException ex) {
             cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
 
-            throw new IgniteCheckedException("Failed to persist cache configuration: " + cacheData.config().getName(), ex);
+            throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex);
+        }
+        finally {
+            chgLock.readLock().unlock();
         }
     }
 
+    /**
+     * @param lsnr Instance of listener to add.
+     */
+    public void addConfigurationChangeListener(BiConsumer<String, File> lsnr) {
+        assert chgLock.isWriteLockedByCurrentThread();
+
+        lsnrs.add(lsnr);
+    }
+
+    /**
+     * @param lsnr Instance of listener to remove.
+     */
+    public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
+        lsnrs.remove(lsnr);
+    }
+
     /** {@inheritDoc} */
     @Override public void shutdownForCacheGroup(CacheGroupContext grp, boolean destroy) throws IgniteCheckedException {
         grpsWithoutIdx.remove(grp.groupId());
@@ -492,7 +526,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
 
     /** {@inheritDoc} */
     @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException {
-        assert partId <= PageIdAllocator.MAX_PARTITION_ID;
+        assert partId <= MAX_PARTITION_ID;
 
         PageStore store = getStore(grpId, partId);
 
@@ -651,6 +685,47 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     }
 
     /**
+     * @param grpId Cache group id.
+     * @param encrypted {@code true} if cache group encryption enabled.
+     * @return Factory to create page stores.
+     */
+    public FilePageStoreFactory getPageStoreFactory(int grpId, boolean encrypted) {
+        FileIOFactory pageStoreFileIoFactory = this.pageStoreFileIoFactory;
+        FileIOFactory pageStoreV1FileIoFactory = this.pageStoreV1FileIoFactory;
+
+        if (encrypted) {
+            pageStoreFileIoFactory = new EncryptedFileIOFactory(
+                this.pageStoreFileIoFactory,
+                grpId,
+                pageSize(),
+                cctx.kernalContext().encryption(),
+                cctx.gridConfig().getEncryptionSpi());
+
+            pageStoreV1FileIoFactory = new EncryptedFileIOFactory(
+                this.pageStoreV1FileIoFactory,
+                grpId,
+                pageSize(),
+                cctx.kernalContext().encryption(),
+                cctx.gridConfig().getEncryptionSpi());
+        }
+
+        FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory(
+            pageStoreFileIoFactory,
+            pageStoreV1FileIoFactory,
+            igniteCfg.getDataStorageConfiguration()
+        );
+
+        if (encrypted) {
+            int headerSize = pageStoreFactory.headerSize(pageStoreFactory.latestVersion());
+
+            ((EncryptedFileIOFactory)pageStoreFileIoFactory).headerSize(headerSize);
+            ((EncryptedFileIOFactory)pageStoreV1FileIoFactory).headerSize(headerSize);
+        }
+
+        return pageStoreFactory;
+    }
+
+    /**
      * @param cacheWorkDir Work directory.
      * @param grpId Group ID.
      * @param partitions Number of partitions.
@@ -672,43 +747,13 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
             if (dirExisted && !idxFile.exists())
                 grpsWithoutIdx.add(grpId);
 
-            FileIOFactory pageStoreFileIoFactory = this.pageStoreFileIoFactory;
-            FileIOFactory pageStoreV1FileIoFactory = this.pageStoreV1FileIoFactory;
-
-            if (encrypted) {
-                pageStoreFileIoFactory = new EncryptedFileIOFactory(
-                    this.pageStoreFileIoFactory,
-                    grpId,
-                    pageSize(),
-                    cctx.kernalContext().encryption(),
-                    cctx.gridConfig().getEncryptionSpi());
-
-                pageStoreV1FileIoFactory = new EncryptedFileIOFactory(
-                    this.pageStoreV1FileIoFactory,
-                    grpId,
-                    pageSize(),
-                    cctx.kernalContext().encryption(),
-                    cctx.gridConfig().getEncryptionSpi());
-            }
-
-            FileVersionCheckingFactory pageStoreFactory = new FileVersionCheckingFactory(
-                pageStoreFileIoFactory,
-                pageStoreV1FileIoFactory,
-                igniteCfg.getDataStorageConfiguration()
-            );
-
-            if (encrypted) {
-                int headerSize = pageStoreFactory.headerSize(pageStoreFactory.latestVersion());
-
-                ((EncryptedFileIOFactory)pageStoreFileIoFactory).headerSize(headerSize);
-                ((EncryptedFileIOFactory)pageStoreV1FileIoFactory).headerSize(headerSize);
-            }
+            FilePageStoreFactory pageStoreFactory = getPageStoreFactory(grpId, encrypted);
 
             PageStore idxStore =
-            pageStoreFactory.createPageStore(
-                PageMemory.FLAG_IDX,
-                idxFile,
-                allocatedTracker);
+                pageStoreFactory.createPageStore(
+                    PageMemory.FLAG_IDX,
+                    idxFile,
+                    allocatedTracker);
 
             PageStore[] partStores = new PageStore[partitions];
 
@@ -739,7 +784,27 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
      * @param partId Partition id.
      */
     @NotNull private Path getPartitionFilePath(File cacheWorkDir, int partId) {
-        return new File(cacheWorkDir, String.format(PART_FILE_TEMPLATE, partId)).toPath();
+        return new File(cacheWorkDir, getPartitionFileName(partId)).toPath();
+    }
+
+    /**
+     * @param workDir Cache work directory.
+     * @param cacheDirName Cache directory name.
+     * @param partId Partition id.
+     * @return Partition file.
+     */
+    @NotNull public static File getPartitionFile(File workDir, String cacheDirName, int partId) {
+        return new File(cacheWorkDir(workDir, cacheDirName), getPartitionFileName(partId));
+    }
+
+    /**
+     * @param partId Partition id.
+     * @return File name.
+     */
+    public static String getPartitionFileName(int partId) {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+        return partId == INDEX_PARTITION ? INDEX_FILE_NAME : format(PART_FILE_TEMPLATE, partId);
     }
 
     /** {@inheritDoc} */
@@ -856,7 +921,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
 
     /** {@inheritDoc} */
     @Override public long allocatePage(int grpId, int partId, byte flags) throws IgniteCheckedException {
-        assert partId <= PageIdAllocator.MAX_PARTITION_ID || partId == PageIdAllocator.INDEX_PARTITION;
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
 
         PageStore store = getStore(grpId, partId);
 
@@ -884,6 +949,36 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
         return store.pages();
     }
 
+    /**
+     * @param ccfgs List of cache configurations to process.
+     * @param ccfgCons Consumer which accepts found configurations files.
+     */
+    public void readConfigurationFiles(List<CacheConfiguration<?, ?>> ccfgs,
+        BiConsumer<CacheConfiguration<?, ?>, File> ccfgCons) {
+        chgLock.writeLock().lock();
+
+        try {
+            for (CacheConfiguration<?, ?> ccfg : ccfgs) {
+                File cacheDir = cacheWorkDir(ccfg);
+
+                if (!cacheDir.exists())
+                    continue;
+
+                File[] ccfgFiles = cacheDir.listFiles((dir, name) ->
+                    name.endsWith(CACHE_DATA_FILENAME));
+
+                if (ccfgFiles == null)
+                    continue;
+
+                for (File ccfgFile : ccfgFiles)
+                    ccfgCons.accept(ccfg, ccfgFile);
+            }
+        }
+        finally {
+            chgLock.writeLock().unlock();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public Map<String, StoredCacheData> readCacheConfigurations() throws IgniteCheckedException {
         if (cctx.kernalContext().clientNode())
@@ -997,24 +1092,45 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
      * @param ccfg Cache configuration.
      * @return Store dir for given cache.
      */
-    public File cacheWorkDir(CacheConfiguration ccfg) {
-        boolean isSharedGrp = ccfg.getGroupName() != null;
-
-        return cacheWorkDir(isSharedGrp, isSharedGrp ? ccfg.getGroupName() : ccfg.getName());
+    public File cacheWorkDir(CacheConfiguration<?, ?> ccfg) {
+        return cacheWorkDir(storeWorkDir, cacheDirName(ccfg));
     }
 
     /**
-     *
+     * @param isSharedGroup {@code True} if cache is sharing the same `underlying` cache.
+     * @param cacheOrGroupName Cache name.
+     * @return Store directory for given cache.
      */
     public File cacheWorkDir(boolean isSharedGroup, String cacheOrGroupName) {
-        String dirName;
+        return cacheWorkDir(storeWorkDir, cacheDirName(isSharedGroup, cacheOrGroupName));
+    }
 
-        if (isSharedGroup)
-            dirName = CACHE_GRP_DIR_PREFIX + cacheOrGroupName;
-        else
-            dirName = CACHE_DIR_PREFIX + cacheOrGroupName;
+    /**
+     * @param cacheDirName Cache directory name.
+     * @return Store directory for given cache.
+     */
+    public static File cacheWorkDir(File storeWorkDir, String cacheDirName) {
+        return new File(storeWorkDir, cacheDirName);
+    }
+
+    /**
+     * @param isSharedGroup {@code True} if cache is sharing the same `underlying` cache.
+     * @param cacheOrGroupName Cache name.
+     * @return The full cache directory name.
+     */
+    public static String cacheDirName(boolean isSharedGroup, String cacheOrGroupName) {
+        return isSharedGroup ? CACHE_GRP_DIR_PREFIX + cacheOrGroupName
+            : CACHE_DIR_PREFIX + cacheOrGroupName;
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @return The full cache directory name.
+     */
+    public static String cacheDirName(CacheConfiguration<?, ?> ccfg) {
+        boolean isSharedGrp = ccfg.getGroupName() != null;
 
-        return new File(storeWorkDir, dirName);
+        return cacheDirName(isSharedGrp, isSharedGrp ? ccfg.getGroupName() : ccfg.getName());
     }
 
     /**
@@ -1075,22 +1191,37 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
 
     /** {@inheritDoc} */
     @Override public void removeCacheData(StoredCacheData cacheData) throws IgniteCheckedException {
-        CacheConfiguration cacheCfg = cacheData.config();
-        File cacheWorkDir = cacheWorkDir(cacheCfg);
-        File file;
+        chgLock.readLock().lock();
 
-        if (cacheData.config().getGroupName() != null)
-            file = new File(cacheWorkDir, cacheCfg.getName() + CACHE_DATA_FILENAME);
-        else
-            file = new File(cacheWorkDir, CACHE_DATA_FILENAME);
+        try {
+            CacheConfiguration<?, ?> ccfg = cacheData.config();
+            File file = cacheConfigurationFile(ccfg);
 
-        if (file.exists()) {
-            if (!file.delete())
-                throw new IgniteCheckedException("Failed to delete cache configuration: " + cacheCfg.getName());
+            if (file.exists()) {
+                for (BiConsumer<String, File> lsnr : lsnrs)
+                    lsnr.accept(ccfg.getName(), file);
+
+                if (!file.delete())
+                    throw new IgniteCheckedException("Failed to delete cache configuration: " + ccfg.getName());
+            }
+        }
+        finally {
+            chgLock.readLock().unlock();
         }
     }
 
     /**
+     * @param ccfg Cache configuration.
+     * @return Cache configuration file with respect to {@link CacheConfiguration#getGroupName} value.
+     */
+    private File cacheConfigurationFile(CacheConfiguration<?, ?> ccfg) {
+        File cacheWorkDir = cacheWorkDir(ccfg);
+
+        return ccfg.getGroupName() == null ? new File(cacheWorkDir, CACHE_DATA_FILENAME) :
+            new File(cacheWorkDir, ccfg.getName() + CACHE_DATA_FILENAME);
+    }
+
+    /**
      * @param store Store to shutdown.
      * @param cleanFile {@code True} if files should be cleaned.
      * @param aggr Aggregating exception.
@@ -1167,10 +1298,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
             throw new IgniteCheckedException("Failed to get page store for the given cache ID " +
                 "(cache has not been started): " + grpId);
 
-        if (partId == PageIdAllocator.INDEX_PARTITION)
+        if (partId == INDEX_PARTITION)
             return holder.idxStore;
 
-        if (partId > PageIdAllocator.MAX_PARTITION_ID)
+        if (partId > MAX_PARTITION_ID)
             throw new IgniteCheckedException("Partition ID is reserved: " + partId);
 
         PageStore store = holder.partStores[partId];
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
index dbdf670..c236827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/partstate/GroupPartitionId.java
@@ -52,7 +52,7 @@ public class GroupPartitionId implements Comparable<GroupPartitionId> {
      * @param partId Partition ID.
      * @return flag to be used for partition
      */
-    private static byte getFlagByPartId(final int partId) {
+    public static byte getFlagByPartId(final int partId) {
         return partId == PageIdAllocator.INDEX_PARTITION ? PageMemory.FLAG_IDX : PageMemory.FLAG_DATA;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
index 21849b3..c377582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
@@ -36,7 +35,10 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Snapshot manager stub.
+ *
+ * @deprecated Use {@link IgniteSnapshotManager}.
  */
+@Deprecated
 public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends GridCacheSharedManagerAdapter implements IgniteChangeGlobalStateSupport {
     /** Snapshot started lock filename. */
     public static final String SNAPSHOT_RESTORE_STARTED_LOCK_FILENAME = "snapshot-started.loc";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
new file mode 100644
index 0000000..ae8203a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -0,0 +1,1233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSnapshot;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteFeatures;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.ignite.thread.OomExceptionHandler;
+import org.jetbrains.annotations.Nullable;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.ignite.cluster.ClusterState.active;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId;
+import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/**
+ * Internal implementation of snapshot operations over persistence caches.
+ * <p>
+ * These major actions available:
+ * <ul>
+ *     <li>Create snapshot of the whole cluster cache groups by triggering PME to achieve consistency.</li>
+ * </ul>
+ */
+public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
+    implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener {
+    /** File with delta pages suffix. */
+    public static final String DELTA_SUFFIX = ".delta";
+
+    /** File name template consists of delta pages. */
+    public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + DELTA_SUFFIX;
+
+    /** File name template for index delta pages. */
+    public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + DELTA_SUFFIX;
+
+    /** Text Reason for checkpoint to start snapshot operation. */
+    public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
+
+    /** Default snapshot directory for loading remote snapshots. */
+    public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
+
+    /** Snapshot in progress error message. */
+    public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
+
+    /** Error message to finalize snapshot tasks. */
+    public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
+        "is stopping";
+
+    /** Metastorage key to save currently running snapshot. */
+    public static final String SNP_RUNNING_KEY = "snapshot-running";
+
+    /** Snapshot metrics prefix. */
+    public static final String SNAPSHOT_METRICS = "snapshot";
+
+    /** Prefix for snapshot threads. */
+    private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner";
+
+    /** Total number of thread to perform local snapshot. */
+    private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
+
+    /**
+     * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
+     * It is important to have only only buffer per thread (instead of creating each buffer per
+     * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is redundant and can lead to OOM errors. Direct buffer
+     * deallocate only when ByteBuffer is garbage collected, but it can get out of off-heap memory before it.
+     */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
+    /** Map of registered cache snapshot processes and their corresponding contexts. */
+    private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+
+    /** Lock to protect the resources is used. */
+    private final GridBusyLock busyLock = new GridBusyLock();
+
+    /** Mutex used to order cluster snapshot operation progress. */
+    private final Object snpOpMux = new Object();
+
+    /** Take snapshot operation procedure. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc;
+
+    /** Check previously performed snapshot operation and delete uncompleted files if need. */
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc;
+
+    /** Resolved persistent data storage settings. */
+    private volatile PdsFolderSettings pdsSettings;
+
+    /** Fully initialized metastorage. */
+    private volatile ReadWriteMetastorage metaStorage;
+
+    /** Local snapshot sender factory. */
+    private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
+
+    /** Main snapshot directory to save created snapshots. */
+    private volatile File locSnpDir;
+
+    /**
+     * Working directory for loaded snapshots from the remote nodes and storing
+     * temporary partition delta-files of locally started snapshot process.
+     */
+    private File tmpWorkDir;
+
+    /** Factory to working with delta as file storage. */
+    private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+    /** Factory to create page store for restore. */
+    private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> storeFactory;
+
+    /** Snapshot thread pool to perform local partition snapshots. */
+    private ExecutorService snpRunner;
+
+    /** System discovery message listener. */
+    private DiscoveryEventListener discoLsnr;
+
+    /** Cluster snapshot operation requested by user. */
+    private ClusterSnapshotFuture clusterSnpFut;
+
+    /** Current snapshot operation on local node. */
+    private volatile SnapshotOperationRequest clusterSnpReq;
+
+    /** {@code true} if recovery process occurred for snapshot. */
+    private volatile boolean recovered;
+
+    /** Last seen cluster snapshot operation. */
+    private volatile ClusterSnapshotFuture lastSeenSnpFut = new ClusterSnapshotFuture();
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteSnapshotManager(GridKernalContext ctx) {
+        locBuff = ThreadLocal.withInitial(() ->
+            ByteBuffer.allocateDirect(ctx.config().getDataStorageConfiguration().getPageSize())
+                .order(ByteOrder.nativeOrder()));
+
+        startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::initLocalSnapshotStartStage,
+            this::processLocalSnapshotStartStageResult, SnapshotStartDiscoveryMessage::new);
+
+        endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::initLocalSnapshotEndStage,
+            this::processLocalSnapshotEndStageResult);
+    }
+
+    /**
+     * @param snapshotCacheDir Snapshot directory to store files.
+     * @param partId Cache partition identifier.
+     * @return A file representation.
+     */
+    public static File partDeltaFile(File snapshotCacheDir, int partId) {
+        return new File(snapshotCacheDir, partDeltaFileName(partId));
+    }
+
+    /**
+     * @param partId Partition id.
+     * @return File name of delta partition pages.
+     */
+    public static String partDeltaFileName(int partId) {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+        return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : String.format(PART_DELTA_TEMPLATE, partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        super.start0();
+
+        GridKernalContext ctx = cctx.kernalContext();
+
+        if (ctx.clientNode())
+            return;
+
+        if (!CU.isPersistenceEnabled(ctx.config()))
+            return;
+
+        snpRunner = new IgniteThreadPoolExecutor(SNAPSHOT_RUNNER_THREAD_PREFIX,
+            cctx.igniteInstanceName(),
+            SNAPSHOT_THREAD_POOL_SIZE,
+            SNAPSHOT_THREAD_POOL_SIZE,
+            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+            new LinkedBlockingQueue<>(),
+            SYSTEM_POOL,
+            new OomExceptionHandler(ctx));
+
+        assert cctx.pageStore() instanceof FilePageStoreManager;
+
+        FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+        pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders();
+
+        locSnpDir = resolveSnapshotWorkDirectory(ctx.config());
+        tmpWorkDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR, true);
+
+        U.ensureDirectory(locSnpDir, "snapshot work directory", log);
+        U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);
+
+        MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);
+
+        mreg.register("LastSnapshotStartTime", () -> lastSeenSnpFut.startTime,
+            "The system time of the last cluster snapshot request start time on this node.");
+        mreg.register("LastSnapshotEndTime", () -> lastSeenSnpFut.endTime,
+            "The system time of the last cluster snapshot request end time on this node.");
+        mreg.register("LastSnapshotName", () -> lastSeenSnpFut.name, String.class,
+            "The name of last started cluster snapshot request on this node.");
+        mreg.register("LastSnapshotErrorMessage",
+            () -> lastSeenSnpFut.error() == null ? "" : lastSeenSnpFut.error().getMessage(),
+            String.class,
+            "The error message of last started cluster snapshot request which fail with an error. " +
+                "This value will be empty if last snapshot request has been completed successfully.");
+        mreg.register("LocalSnapshotNames", this::localSnapshotNames, List.class,
+            "The list of names of all snapshots currently saved on the local node with respect to " +
+                "the configured via IgniteConfiguration snapshot working path.");
+
+        storeFactory = storeMgr::getPageStoreFactory;
+
+        cctx.exchange().registerExchangeAwareComponent(this);
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+
+        cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                UUID leftNodeId = evt.eventNode().id();
+
+                if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) {
+                    SnapshotOperationRequest snpReq = clusterSnpReq;
+
+                    for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+                        if (sctx.sourceNodeId().equals(leftNodeId) ||
+                            (snpReq != null &&
+                                snpReq.snpName.equals(sctx.snapshotName()) &&
+                                snpReq.bltNodes.contains(leftNodeId))) {
+                            sctx.acceptException(new ClusterTopologyCheckedException("Snapshot operation interrupted. " +
+                                "One of baseline nodes left the cluster: " + leftNodeId));
+                        }
+                    }
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        busyLock.block();
+
+        try {
+            // Try stop all snapshot processing if not yet.
+            for (SnapshotFutureTask sctx : locSnpTasks.values())
+                sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+            locSnpTasks.clear();
+
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null) {
+                    clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
+
+                    clusterSnpFut = null;
+                }
+            }
+
+            if (snpRunner != null)
+                snpRunner.shutdownNow();
+
+            if (discoLsnr != null)
+                cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
+
+            cctx.exchange().unregisterExchangeAwareComponent(this);
+        }
+        finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * @param snpDir Snapshot dir.
+     * @param folderName Local node folder name (see {@link U#maskForFileName} with consistent id).
+     */
+    public static void deleteSnapshot(File snpDir, String folderName) {
+        if (!snpDir.exists())
+            return;
+
+        assert snpDir.isDirectory() : snpDir;
+
+        try {
+            File binDir = resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName);
+            File dbDir = U.resolveWorkDirectory(snpDir.getAbsolutePath(), databaseRelativePath(folderName), false);
+
+            U.delete(binDir);
+            U.delete(dbDir);
+
+            File marshDir = mappingFileStoreWorkDir(snpDir.getAbsolutePath());
+
+            // Concurrently traverse the snapshot marshaller directory and delete all files.
+            Files.walkFileTree(marshDir.toPath(), new SimpleFileVisitor<Path>() {
+                @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
+                    U.delete(file);
+
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override public FileVisitResult visitFileFailed(Path file, IOException exc) {
+                    // Skip files which can be concurrently removed from FileTree.
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+
+            File db = new File(snpDir, DB_DEFAULT_FOLDER);
+
+            if (!db.exists() || db.list().length == 0)
+                U.delete(snpDir);
+        }
+        catch (IOException | IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param snpName Snapshot name.
+     * @return Local snapshot directory for snapshot with given name.
+     */
+    public File snapshotLocalDir(String snpName) {
+        assert locSnpDir != null;
+        assert U.alphanumericUnderscore(snpName) : snpName;
+
+        return new File(locSnpDir, snpName);
+    }
+
+    /**
+     * @return Node snapshot working directory.
+     */
+    public File snapshotTmpDir() {
+        assert tmpWorkDir != null;
+
+        return tmpWorkDir;
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when a snapshot has been started.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartStage(SnapshotOperationRequest req) {
+        if (cctx.kernalContext().clientNode() ||
+            !CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState()))
+            return new GridFinishedFuture<>();
+
+        // Executed inside discovery notifier thread, prior to firing discovery custom event,
+        // so it is safe to set new snapshot task inside this method without synchronization.
+        if (clusterSnpReq != null) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot operation has been rejected. " +
+                "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']'));
+        }
+
+        Set<UUID> leftNodes = new HashSet<>(req.bltNodes);
+        leftNodes.removeAll(F.viewReadOnly(cctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
+            F.node2id()));
+
+        if (!leftNodes.isEmpty()) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Some of baseline nodes left the cluster " +
+                "prior to snapshot operation start: " + leftNodes));
+        }
+
+        Set<Integer> leftGrps = new HashSet<>(req.grpIds);
+        leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet());
+
+        if (!leftGrps.isEmpty()) {
+            return new GridFinishedFuture<>(new IgniteCheckedException("Some of requested cache groups doesn't exist " +
+                "on the local node [missed=" + leftGrps + ", nodeId=" + cctx.localNodeId() + ']'));
+        }
+
+        Map<Integer, Set<Integer>> parts = new HashMap<>();
+
+        // Prepare collection of pairs group and appropriate cache partition to be snapshot.
+        // Cache group context may be 'null' on some nodes e.g. a node filter is set.
+        for (Integer grpId : req.grpIds) {
+            if (cctx.cache().cacheGroup(grpId) == null)
+                continue;
+
+            parts.put(grpId, null);
+        }
+
+        if (parts.isEmpty())
+            return new GridFinishedFuture<>();
+
+        SnapshotFutureTask task0 = registerSnapshotTask(req.snpName,
+            req.srcNodeId,
+            parts,
+            locSndrFactory.apply(req.snpName));
+
+        clusterSnpReq = req;
+
+        return task0.chain(fut -> {
+            if (fut.error() == null)
+                return new SnapshotOperationResponse();
+            else
+                throw new GridClosureException(fut.error());
+        });
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotStartStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        if (cctx.kernalContext().clientNode())
+            return;
+
+        SnapshotOperationRequest snpReq = clusterSnpReq;
+
+        if (snpReq == null || !snpReq.rqId.equals(id)) {
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null && clusterSnpFut.rqId.equals(id)) {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot operation has not been fully completed " +
+                        "[err=" + err + ", snpReq=" + snpReq + ']'));
+
+                    clusterSnpFut = null;
+                }
+
+                return;
+            }
+        }
+
+        if (isLocalNodeCoordinator(cctx.discovery())) {
+            Set<UUID> missed = new HashSet<>(snpReq.bltNodes);
+            missed.removeAll(res.keySet());
+            missed.removeAll(err.keySet());
+
+            snpReq.hasErr = !F.isEmpty(err) || !missed.isEmpty();
+
+            if (snpReq.hasErr) {
+                U.warn(log, "Execution of local snapshot tasks fails or them haven't been executed " +
+                    "due to some of nodes left the cluster. Uncompleted snapshot will be deleted " +
+                    "[err=" + err + ", missed=" + missed + ']');
+            }
+
+            endSnpProc.start(UUID.randomUUID(), snpReq);
+        }
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @return Future which will be completed when the snapshot will be finalized.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotEndStage(SnapshotOperationRequest req) {
+        if (clusterSnpReq == null)
+            return new GridFinishedFuture<>(new SnapshotOperationResponse());
+
+        try {
+            if (req.hasErr)
+                deleteSnapshot(snapshotLocalDir(req.snpName), pdsSettings.folderName());
+
+            removeLastMetaStorageKey();
+        }
+        catch (Exception e) {
+            return new GridFinishedFuture<>(e);
+        }
+
+        return new GridFinishedFuture<>(new SnapshotOperationResponse());
+    }
+
+    /**
+     * @param id Request id.
+     * @param res Results.
+     * @param err Errors.
+     */
+    private void processLocalSnapshotEndStageResult(UUID id, Map<UUID, SnapshotOperationResponse> res, Map<UUID, Exception> err) {
+        SnapshotOperationRequest snpReq = clusterSnpReq;
+
+        if (snpReq == null)
+            return;
+
+        Set<UUID> endFail = new HashSet<>(snpReq.bltNodes);
+        endFail.removeAll(res.keySet());
+
+        clusterSnpReq = null;
+
+        synchronized (snpOpMux) {
+            if (clusterSnpFut != null) {
+                if (endFail.isEmpty() && !snpReq.hasErr) {
+                    clusterSnpFut.onDone();
+
+                    if (log.isInfoEnabled())
+                        log.info("Cluster-wide snapshot operation finished successfully [req=" + snpReq + ']');
+                }
+                else {
+                    clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " +
+                        "Local snapshot tasks may not finished completely or finalizing results fails " +
+                        "[hasErr=" + snpReq.hasErr + ", fail=" + endFail + ", err=" + err + ']'));
+                }
+
+                clusterSnpFut = null;
+            }
+        }
+    }
+
+    /**
+     * @return {@code True} if snapshot operation is in progress.
+     */
+    public boolean isSnapshotCreating() {
+        if (clusterSnpReq != null)
+            return true;
+
+        synchronized (snpOpMux) {
+            return clusterSnpReq != null || clusterSnpFut != null;
+        }
+    }
+
+    /**
+     * @return List of all known snapshots on the local node.
+     */
+    public List<String> localSnapshotNames() {
+        if (cctx.kernalContext().clientNode())
+            throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+        synchronized (snpOpMux) {
+            return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+                .map(File::getName)
+                .collect(Collectors.toList());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> createSnapshot(String name) {
+        A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
+        A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+
+        try {
+            if (cctx.kernalContext().clientNode())
+                throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+            if (!IgniteFeatures.allNodesSupports(cctx.discovery().allNodes(), PERSISTENCE_CACHE_SNAPSHOT))
+                throw new IgniteException("Not all nodes in the cluster support a snapshot operation.");
+
+            if (!active(cctx.kernalContext().state().clusterState().state()))
+                throw new IgniteException("Snapshot operation has been rejected. The cluster is inactive.");
+
+            DiscoveryDataClusterState clusterState = cctx.kernalContext().state().clusterState();
+
+            if (!clusterState.hasBaselineTopology())
+                throw new IgniteException("Snapshot operation has been rejected. The baseline topology is not configured for cluster.");
+
+            ClusterSnapshotFuture snpFut0;
+
+            synchronized (snpOpMux) {
+                if (clusterSnpFut != null && !clusterSnpFut.isDone())
+                    throw new IgniteException("Create snapshot request has been rejected. The previous snapshot operation was not completed.");
+
+                if (clusterSnpReq != null)
+                    throw new IgniteException("Create snapshot request has been rejected. Parallel snapshot processes are not allowed.");
+
+                if (localSnapshotNames().contains(name))
+                    throw new IgniteException("Create snapshot request has been rejected. Snapshot with given name already exists on local node.");
+
+                snpFut0 = new ClusterSnapshotFuture(UUID.randomUUID(), name);
+
+                clusterSnpFut = snpFut0;
+                lastSeenSnpFut = snpFut0;
+            }
+
+            List<Integer> grps = cctx.cache().persistentGroups().stream()
+                .filter(g -> cctx.cache().cacheType(g.cacheOrGroupName()) == CacheType.USER)
+                .filter(g -> !g.config().isEncryptionEnabled())
+                .map(CacheGroupDescriptor::groupId)
+                .collect(Collectors.toList());
+
+            List<ClusterNode> srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
+
+            startSnpProc.start(snpFut0.rqId, new SnapshotOperationRequest(snpFut0.rqId,
+                cctx.localNodeId(),
+                name,
+                grps,
+                new HashSet<>(F.viewReadOnly(srvNodes,
+                    F.node2id(),
+                    (node) -> CU.baselineNode(node, clusterState)))));
+
+            if (log.isInfoEnabled())
+                log.info("Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']');
+
+            return new IgniteFutureImpl<>(snpFut0);
+        }
+        catch (Exception e) {
+            U.error(log, "Start snapshot operation failed", e);
+
+            lastSeenSnpFut = new ClusterSnapshotFuture(name, e);
+
+            return new IgniteFinishedFutureImpl<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForReadWrite(ReadWriteMetastorage metaStorage) throws IgniteCheckedException {
+        synchronized (snpOpMux) {
+            this.metaStorage = metaStorage;
+
+            if (recovered)
+                removeLastMetaStorageKey();
+
+            recovered = false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReadyForRead(ReadOnlyMetastorage metaStorage) throws IgniteCheckedException {
+        // Snapshot which has not been completed due to the local node crashed must be deleted.
+        String snpName = (String)metaStorage.read(SNP_RUNNING_KEY);
+
+        if (snpName == null)
+            return;
+
+        recovered = true;
+
+        for (File tmp : snapshotTmpDir().listFiles())
+            U.delete(tmp);
+
+        deleteSnapshot(snapshotLocalDir(snpName), pdsSettings.folderName());
+
+        if (log.isInfoEnabled()) {
+            log.info("Previous attempt to create snapshot fail due to the local node crash. All resources " +
+                "related to snapshot operation have been deleted: " + snpName);
+        }
+    }
+
+    /**
+     * @param evt Discovery event to check.
+     * @return {@code true} if exchange started by snapshot operation.
+     */
+    public static boolean isSnapshotOperation(DiscoveryEvent evt) {
+        return !evt.eventNode().isClient() &&
+            evt.type() == EVT_DISCOVERY_CUSTOM_EVT &&
+            ((DiscoveryCustomEvent)evt).customMessage() instanceof SnapshotStartDiscoveryMessage;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        if (clusterSnpReq == null || cctx.kernalContext().clientNode())
+            return;
+
+        SnapshotOperationRequest snpReq = clusterSnpReq;
+
+        SnapshotFutureTask task = locSnpTasks.get(snpReq.snpName);
+
+        if (task == null)
+            return;
+
+        if (task.start()) {
+            cctx.database().forceCheckpoint(String.format("Start snapshot operation: %s", snpReq.snpName));
+
+            // Schedule task on a checkpoint and wait when it starts.
+            try {
+                task.awaitStarted();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Fail to wait while cluster-wide snapshot operation started", e);
+            }
+        }
+    }
+
+    /**
+     * @param grps List of cache groups which will be destroyed.
+     */
+    public void onCacheGroupsStopped(List<Integer> grps) {
+        for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+            Set<Integer> retain = new HashSet<>(grps);
+            retain.retainAll(sctx.affectedCacheGroups());
+
+            if (!retain.isEmpty()) {
+                sctx.acceptException(new IgniteCheckedException("Snapshot has been interrupted due to some of the required " +
+                    "cache groups stopped: " + retain));
+            }
+        }
+    }
+
+    /**
+     * @param snpName Unique snapshot name.
+     * @param srcNodeId Node id which cause snapshot operation.
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     * @return Snapshot operation task which should be registered on checkpoint to run.
+     */
+    SnapshotFutureTask registerSnapshotTask(
+        String snpName,
+        UUID srcNodeId,
+        Map<Integer, Set<Integer>> parts,
+        SnapshotSender snpSndr
+    ) {
+        if (!busyLock.enterBusy())
+            return new SnapshotFutureTask(new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
+
+        try {
+            if (locSnpTasks.containsKey(snpName))
+                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+
+            SnapshotFutureTask snpFutTask;
+
+            SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
+                snpFutTask = new SnapshotFutureTask(cctx,
+                    srcNodeId,
+                    snpName,
+                    tmpWorkDir,
+                    ioFactory,
+                    snpSndr,
+                    parts,
+                    locBuff));
+
+            if (prev != null)
+                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+
+            if (log.isInfoEnabled()) {
+                log.info("Snapshot task has been registered on local node [sctx=" + this +
+                    ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
+            }
+
+            snpFutTask.listen(f -> locSnpTasks.remove(snpName));
+
+            return snpFutTask;
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param factory Factory which produces {@link LocalSnapshotSender} implementation.
+     */
+    void localSnapshotSenderFactory(Function<String, SnapshotSender> factory) {
+        locSndrFactory = factory;
+    }
+
+    /**
+     * @return Factory which produces {@link LocalSnapshotSender} implementation.
+     */
+    Function<String, SnapshotSender> localSnapshotSenderFactory() {
+        return locSndrFactory;
+    }
+
+    /** Snapshot finished successfully or already restored. Key can be removed. */
+    private void removeLastMetaStorageKey() throws IgniteCheckedException {
+        cctx.database().checkpointReadLock();
+
+        try {
+            metaStorage.remove(SNP_RUNNING_KEY);
+        }
+        finally {
+            cctx.database().checkpointReadUnlock();
+        }
+    }
+
+    /**
+     * @return The executor used to run snapshot tasks.
+     */
+    Executor snapshotExecutorService() {
+        assert snpRunner != null;
+
+        return snpRunner;
+    }
+
+    /**
+     * @param ioFactory Factory to create IO interface over a page stores.
+     */
+    void ioFactory(FileIOFactory ioFactory) {
+        this.ioFactory = ioFactory;
+    }
+
+    /**
+     * @return Relative configured path of persistence data storage directory for the local node.
+     * Example: {@code snapshotWorkDir/db/IgniteNodeName0}
+     */
+    static String databaseRelativePath(String folderName) {
+        return Paths.get(DB_DEFAULT_FOLDER, folderName).toString();
+    }
+
+    /**
+     * @param cfg Ignite configuration.
+     * @return Snapshot directory resolved through given configuration.
+     */
+    static File resolveSnapshotWorkDirectory(IgniteConfiguration cfg) {
+        try {
+            return U.resolveWorkDirectory(cfg.getWorkDirectory(), cfg.getSnapshotPath(), false);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param factory Factory to produce FileIO access.
+     * @param from Copy from file.
+     * @param to Copy data to file.
+     * @param length Number of bytes to copy from beginning.
+     */
+    static void copy(FileIOFactory factory, File from, File to, long length) {
+        try (FileIO src = factory.create(from, READ);
+             FileChannel dest = new FileOutputStream(to).getChannel()) {
+            if (src.size() < length) {
+                throw new IgniteException("The source file to copy has to enough length " +
+                    "[expected=" + length + ", actual=" + src.size() + ']');
+            }
+
+            src.position(0);
+
+            long written = 0;
+
+            while (written < length)
+                written += src.transferTo(written, length - written, dest);
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Snapshot sender which writes all data to local directory.
+     */
+    private class LocalSnapshotSender extends SnapshotSender {
+        /** Snapshot name. */
+        private final String snpName;
+
+        /** Local snapshot directory. */
+        private final File snpLocDir;
+
+        /** Local node snapshot directory calculated on snapshot directory. */
+        private File dbDir;
+
+        /** Size of page. */
+        private final int pageSize;
+
+        /**
+         * @param snpName Snapshot name.
+         */
+        public LocalSnapshotSender(String snpName) {
+            super(IgniteSnapshotManager.this.log, snpRunner);
+
+            this.snpName = snpName;
+            snpLocDir = snapshotLocalDir(snpName);
+            pageSize = cctx.kernalContext().config().getDataStorageConfiguration().getPageSize();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void init(int partsCnt) {
+            dbDir = new File(snpLocDir, databaseRelativePath(pdsSettings.folderName()));
+
+            if (dbDir.exists()) {
+                throw new IgniteException("Snapshot with given name already exists " +
+                    "[snpName=" + snpName + ", absPath=" + dbDir.getAbsolutePath() + ']');
+            }
+
+            cctx.database().checkpointReadLock();
+
+            try {
+                assert metaStorage != null && metaStorage.read(SNP_RUNNING_KEY) == null :
+                    "The previous snapshot hasn't been completed correctly";
+
+                metaStorage.write(SNP_RUNNING_KEY, snpName);
+
+                U.ensureDirectory(dbDir, "snapshot work directory", log);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+            finally {
+                cctx.database().checkpointReadUnlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendCacheConfig0(File ccfg, String cacheDirName) {
+            assert dbDir != null;
+
+            try {
+                File cacheDir = U.resolveWorkDirectory(dbDir.getAbsolutePath(), cacheDirName, false);
+
+                copy(ioFactory, ccfg, new File(cacheDir, ccfg.getName()), ccfg.length());
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMarshallerMeta0(List<Map<Integer, MappedName>> mappings) {
+            if (mappings == null)
+                return;
+
+            saveMappings(cctx.kernalContext(), mappings, snpLocDir);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendBinaryMeta0(Collection<BinaryType> types) {
+            if (types == null)
+                return;
+
+            cctx.kernalContext().cacheObjects().saveMetadata(types, snpLocDir);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long len) {
+            try {
+                if (len == 0)
+                    return;
+
+                File cacheDir = U.resolveWorkDirectory(dbDir.getAbsolutePath(), cacheDirName, false);
+
+                File snpPart = new File(cacheDir, part.getName());
+
+                if (!snpPart.exists() || snpPart.delete())
+                    snpPart.createNewFile();
+
+                copy(ioFactory, part, snpPart, len);
+
+                if (log.isInfoEnabled()) {
+                    log.info("Partition has been snapshot [snapshotDir=" + dbDir.getAbsolutePath() +
+                        ", cacheDirName=" + cacheDirName + ", part=" + part.getName() +
+                        ", length=" + part.length() + ", snapshot=" + snpPart.getName() + ']');
+                }
+            }
+            catch (IOException | IgniteCheckedException ex) {
+                throw new IgniteException(ex);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+            File snpPart = getPartitionFile(dbDir, cacheDirName, pair.getPartitionId());
+
+            if (log.isInfoEnabled()) {
+                log.info("Start partition snapshot recovery with the given delta page file [part=" + snpPart +
+                    ", delta=" + delta + ']');
+            }
+
+            try (FileIO fileIo = ioFactory.create(delta, READ);
+                 FilePageStore pageStore = (FilePageStore)storeFactory
+                     .apply(pair.getGroupId(), false)
+                     .createPageStore(getFlagByPartId(pair.getPartitionId()),
+                         snpPart::toPath,
+                         new LongAdderMetric("NO_OP", null))
+            ) {
+                ByteBuffer pageBuf = ByteBuffer.allocate(pageSize)
+                    .order(ByteOrder.nativeOrder());
+
+                long totalBytes = fileIo.size();
+
+                assert totalBytes % pageSize == 0 : "Given file with delta pages has incorrect size: " + fileIo.size();
+
+                pageStore.beginRecover();
+
+                for (long pos = 0; pos < totalBytes; pos += pageSize) {
+                    long read = fileIo.readFully(pageBuf, pos);
+
+                    assert read == pageBuf.capacity();
+
+                    pageBuf.flip();
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Read page given delta file [path=" + delta.getName() +
+                            ", pageId=" + PageIO.getPageId(pageBuf) + ", pos=" + pos + ", pages=" + (totalBytes / pageSize) +
+                            ", crcBuff=" + FastCrc.calcCrc(pageBuf, pageBuf.limit()) + ", crcPage=" + PageIO.getCrc(pageBuf) + ']');
+
+                        pageBuf.rewind();
+                    }
+
+                    pageStore.write(PageIO.getPageId(pageBuf), pageBuf, 0, false);
+
+                    pageBuf.flip();
+                }
+
+                pageStore.finishRecover();
+            }
+            catch (IOException | IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void close0(@Nullable Throwable th) {
+            if (th == null) {
+                if (log.isInfoEnabled())
+                    log.info("Local snapshot sender closed, resources released [dbNodeSnpDir=" + dbDir + ']');
+            }
+            else {
+                deleteSnapshot(snpLocDir, pdsSettings.folderName());
+
+                U.warn(log, "Local snapshot sender closed due to an error occurred", th);
+            }
+        }
+    }
+
+    /** Snapshot start request for {@link DistributedProcess} initiate message. */
+    private static class SnapshotOperationRequest implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Unique snapshot request id. */
+        private final UUID rqId;
+
+        /** Source node id which trigger request. */
+        private final UUID srcNodeId;
+
+        /** Snapshot name. */
+        private final String snpName;
+
+        /** The list of cache groups to include into snapshot. */
+        @GridToStringInclude
+        private final List<Integer> grpIds;
+
+        /** The list of affected by snapshot operation baseline nodes. */
+        @GridToStringInclude
+        private final Set<UUID> bltNodes;
+
+        /** {@code true} if an execution of local snapshot tasks failed with an error. */
+        private volatile boolean hasErr;
+
+        /**
+         * @param snpName Snapshot name.
+         * @param grpIds Cache groups to include into snapshot.
+         */
+        public SnapshotOperationRequest(UUID rqId, UUID srcNodeId, String snpName, List<Integer> grpIds, Set<UUID> bltNodes) {
+            this.rqId = rqId;
+            this.srcNodeId = srcNodeId;
+            this.snpName = snpName;
+            this.grpIds = grpIds;
+            this.bltNodes = bltNodes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SnapshotOperationRequest.class, this);
+        }
+    }
+
+    /** */
+    private static class SnapshotOperationResponse implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+    }
+
+    /** Snapshot operation start message. */
+    private static class SnapshotStartDiscoveryMessage extends InitMessage<SnapshotOperationRequest>
+        implements SnapshotDiscoveryMessage {
+        /** Serial version UID. */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param processId Unique process id.
+         * @param req Snapshot initial request.
+         */
+        public SnapshotStartDiscoveryMessage(
+            UUID processId,
+            SnapshotOperationRequest req
+        ) {
+            super(processId, START_SNAPSHOT, req);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needExchange() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needAssignPartitions() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SnapshotStartDiscoveryMessage.class, this);
+        }
+    }
+
+    /** */
+    private static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
+        /** Unique snapshot request id. */
+        private final UUID rqId;
+
+        /** Snapshot name. */
+        private final String name;
+
+        /** Snapshot start time. */
+        private final long startTime;
+
+        /** Snapshot finish time. */
+        private volatile long endTime;
+
+        /**
+         * Default constructor.
+         */
+        public ClusterSnapshotFuture() {
+            onDone();
+
+            rqId = null;
+            name = "";
+            startTime = 0;
+            endTime = 0;
+        }
+
+        /**
+         * @param name Snapshot name.
+         * @param err Error starting snapshot operation.
+         */
+        public ClusterSnapshotFuture(String name, Exception err) {
+            onDone(err);
+
+            this.name = name;
+            startTime = U.currentTimeMillis();
+            endTime = 0;
+            rqId = null;
+        }
+
+        /**
+         * @param rqId Unique snapshot request id.
+         */
+        public ClusterSnapshotFuture(UUID rqId, String name) {
+            this.rqId = rqId;
+            this.name = name;
+            startTime = U.currentTimeMillis();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            endTime = U.currentTimeMillis();
+
+            return super.onDone(res, err, cancel);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
new file mode 100644
index 0000000..93270e2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.store.PageWriteListener;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.copy;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
+
+/**
+ *
+ */
+class SnapshotFutureTask extends GridFutureAdapter<Boolean> implements DbCheckpointListener {
+    /** Shared context. */
+    private final GridCacheSharedContext<?, ?> cctx;
+
+    /** File page store manager for accessing cache group associated files. */
+    private final FilePageStoreManager pageStore;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Node id which cause snapshot operation. */
+    private final UUID srcNodeId;
+
+    /** Unique identifier of snapshot process. */
+    private final String snpName;
+
+    /** Snapshot working directory on file system. */
+    private final File tmpSnpWorkDir;
+
+    /** Local buffer to perform copy-on-write operations for {@link PageStoreSerialWriter}. */
+    private final ThreadLocal<ByteBuffer> locBuff;
+
+    /** IO factory which will be used for creating snapshot delta-writers. */
+    private final FileIOFactory ioFactory;
+
+    /**
+     * The length of file size per each cache partition file.
+     * Partition has value greater than zero only for partitions in OWNING state.
+     * Information collected under checkpoint write lock.
+     */
+    private final Map<GroupPartitionId, Long> partFileLengths = new HashMap<>();
+
+    /**
+     * Map of partitions to snapshot and theirs corresponding delta PageStores.
+     * Writers are pinned to the snapshot context due to controlling partition
+     * processing supplier.
+     */
+    private final Map<GroupPartitionId, PageStoreSerialWriter> partDeltaWriters = new HashMap<>();
+
+    /**
+     * List of cache configuration senders. Each sender associated with particular cache
+     * configuration file to monitor it change (e.g. via SQL add/drop column or SQL index
+     * create/drop operations).
+     */
+    private final List<CacheConfigurationSender> ccfgSndrs = new CopyOnWriteArrayList<>();
+
+    /** Snapshot data sender. */
+    @GridToStringExclude
+    private final SnapshotSender snpSndr;
+
+    /**
+     * Requested map of cache groups and its partitions to include into snapshot. If array of partitions
+     * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+     * In this case if all of partitions have OWNING state the index partition also will be included.
+     * <p>
+     * If partitions for particular cache group are not provided that they will be collected and added
+     * on checkpoint under the write lock.
+     */
+    private final Map<Integer, Set<Integer>> parts;
+
+    /** Cache group and corresponding partitions collected under the checkpoint write lock. */
+    private final Map<Integer, Set<Integer>> processed = new HashMap<>();
+
+    /** Checkpoint end future. */
+    private final CompletableFuture<Boolean> cpEndFut = new CompletableFuture<>();
+
+    /** Future to wait until checkpoint mark phase will be finished and snapshot tasks scheduled. */
+    private final GridFutureAdapter<Void> startedFut = new GridFutureAdapter<>();
+
+    /** Absolute path to save intermediate results of cache partitions of this node. */
+    private volatile File tmpConsIdDir;
+
+    /** Future which will be completed when task requested to be closed. Will be executed on system pool. */
+    private volatile CompletableFuture<Void> closeFut;
+
+    /** An exception which has been occurred during snapshot processing. */
+    private final AtomicReference<Throwable> err = new AtomicReference<>();
+
+    /** Flag indicates that task already scheduled on checkpoint. */
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    /**
+     * @param e Finished snapshot task future with particular exception.
+     */
+    public SnapshotFutureTask(IgniteCheckedException e) {
+        assert e != null : "Exception for a finished snapshot task must be not null";
+
+        cctx = null;
+        pageStore = null;
+        log = null;
+        snpName = null;
+        srcNodeId = null;
+        tmpSnpWorkDir = null;
+        snpSndr = null;
+
+        err.set(e);
+        startedFut.onDone(e);
+        onDone(e);
+        parts = null;
+        ioFactory = null;
+        locBuff = null;
+    }
+
+    /**
+     * @param snpName Unique identifier of snapshot task.
+     * @param ioFactory Factory to working with delta as file storage.
+     * @param parts Map of cache groups and its partitions to include into snapshot, if set of partitions
+     * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+     */
+    public SnapshotFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts,
+        ThreadLocal<ByteBuffer> locBuff
+    ) {
+        assert snpName != null : "Snapshot name cannot be empty or null.";
+        assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
+        assert snpSndr.executor() != null : "Executor service must be not null.";
+        assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files.";
+
+        this.parts = parts;
+        this.cctx = cctx;
+        this.pageStore = (FilePageStoreManager)cctx.pageStore();
+        this.log = cctx.logger(SnapshotFutureTask.class);
+        this.snpName = snpName;
+        this.srcNodeId = srcNodeId;
+        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+        this.snpSndr = snpSndr;
+        this.ioFactory = ioFactory;
+        this.locBuff = locBuff;
+    }
+
+    /**
+     * @return Snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /**
+     * @return Node id which triggers this operation.
+     */
+    public UUID sourceNodeId() {
+        return srcNodeId;
+    }
+
+    /**
+     * @return Type of snapshot operation.
+     */
+    public Class<? extends SnapshotSender> type() {
+        return snpSndr.getClass();
+    }
+
+    /**
+     * @return Set of cache groups included into snapshot operation.
+     */
+    public Set<Integer> affectedCacheGroups() {
+        return parts.keySet();
+    }
+
+    /**
+     * @param th An exception which occurred during snapshot processing.
+     */
+    public void acceptException(Throwable th) {
+        if (th == null)
+            return;
+
+        if (err.compareAndSet(null, th))
+            closeAsync();
+
+        startedFut.onDone(th);
+
+        U.warn(log, "Snapshot task has accepted exception to stop: " + th);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+        for (PageStoreSerialWriter writer : partDeltaWriters.values())
+            U.closeQuiet(writer);
+
+        for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
+            U.closeQuiet(ccfgSndr);
+
+        snpSndr.close(err);
+
+        if (tmpConsIdDir != null)
+            U.delete(tmpConsIdDir);
+
+        // Delete snapshot directory if no other files exists.
+        try {
+            if (U.fileCount(tmpSnpWorkDir.toPath()) == 0 || err != null)
+                U.delete(tmpSnpWorkDir.toPath());
+        }
+        catch (IOException e) {
+            log.error("Snapshot directory doesn't exist [snpName=" + snpName + ", dir=" + tmpSnpWorkDir + ']');
+        }
+
+        if (err != null)
+            startedFut.onDone(err);
+
+        return super.onDone(res, err);
+    }
+
+    /**
+     * @throws IgniteCheckedException If fails.
+     */
+    public void awaitStarted() throws IgniteCheckedException {
+        startedFut.get();
+    }
+
+    /**
+     * @return {@code true} if current task requested to be stopped.
+     */
+    private boolean stopping() {
+        return err.get() != null;
+    }
+
+    /**
+     * Initiates snapshot task.
+     *
+     * @return {@code true} if task started by this call.
+     */
+    public boolean start() {
+        if (stopping())
+            return false;
+
+        try {
+            if (!started.compareAndSet(false, true))
+                return false;
+
+            tmpConsIdDir = U.resolveWorkDirectory(tmpSnpWorkDir.getAbsolutePath(),
+                databaseRelativePath(cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName()),
+                false);
+
+            for (Integer grpId : parts.keySet()) {
+                CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+                if (gctx == null)
+                    throw new IgniteCheckedException("Cache group context not found: " + grpId);
+
+                if (!CU.isPersistentCache(gctx.config(), cctx.kernalContext().config().getDataStorageConfiguration()))
+                    throw new IgniteCheckedException("In-memory cache groups are not allowed to be snapshot: " + grpId);
+
+                if (gctx.config().isEncryptionEnabled())
+                    throw new IgniteCheckedException("Encrypted cache groups are not allowed to be snapshot: " + grpId);
+
+                // Create cache group snapshot directory on start in a single thread.
+                U.ensureDirectory(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())),
+                    "directory for snapshotting cache group",
+                    log);
+            }
+
+            startedFut.listen(f ->
+                ((GridCacheDatabaseSharedManager)cctx.database()).removeCheckpointListener(this)
+            );
+
+            // Listener will be removed right after first execution.
+            ((GridCacheDatabaseSharedManager)cctx.database()).addCheckpointListener(this);
+
+            if (log.isInfoEnabled()) {
+                log.info("Snapshot operation is scheduled on local node and will be handled by the checkpoint " +
+                    "listener [sctx=" + this + ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
+            }
+        }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        if (stopping())
+            return;
+
+        ctx.finishedStateFut().listen(f -> {
+            if (f.error() == null)
+                cpEndFut.complete(true);
+            else
+                cpEndFut.completeExceptionally(f.error());
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        // Write lock is hold. Partition pages counters has been collected under write lock.
+        if (stopping())
+            return;
+
+        try {
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+                int grpId = e.getKey();
+                Set<Integer> grpParts = e.getValue();
+
+                GridDhtPartitionTopology top = cctx.cache().cacheGroup(grpId).topology();
+
+                Iterator<GridDhtLocalPartition> iter;
+
+                if (grpParts == null)
+                    iter = top.currentLocalPartitions().iterator();
+                else {
+                    if (grpParts.contains(INDEX_PARTITION)) {
+                        throw new IgniteCheckedException("Index partition cannot be included into snapshot if " +
+                            " set of cache group partitions has been explicitly provided [grpId=" + grpId + ']');
+                    }
+
+                    iter = F.iterator(grpParts, top::localPartition, false);
+                }
+
+                Set<Integer> owning = new HashSet<>();
+                Set<Integer> missed = new HashSet<>();
+
+                // Iterate over partitions in particular cache group.
+                while (iter.hasNext()) {
+                    GridDhtLocalPartition part = iter.next();
+
+                    // Partition can be in MOVING\RENTING states.
+                    // Index partition will be excluded if not all partition OWNING.
+                    // There is no data assigned to partition, thus it haven't been created yet.
+                    if (part.state() == GridDhtPartitionState.OWNING)
+                        owning.add(part.id());
+                    else
+                        missed.add(part.id());
+                }
+
+                if (grpParts != null) {
+                    // Partition has been provided for cache group, but some of them are not in OWNING state.
+                    // Exit with an error.
+                    if (!missed.isEmpty()) {
+                        throw new IgniteCheckedException("Snapshot operation cancelled due to " +
+                            "not all of requested partitions has OWNING state on local node [grpId=" + grpId +
+                            ", missed" + missed + ']');
+                    }
+                }
+                else {
+                    // Partitions has not been provided for snapshot task and all partitions have
+                    // OWNING state, so index partition must be included into snapshot.
+                    if (!missed.isEmpty()) {
+                        log.warning("All local cache group partitions in OWNING state have been included into a snapshot. " +
+                            "Partitions which have different states skipped. Index partitions has also been skipped " +
+                            "[snpName=" + snpName + ", grpId=" + grpId + ", missed=" + missed + ']');
+                    }
+                    else if (missed.isEmpty() && cctx.kernalContext().query().moduleEnabled())
+                        owning.add(INDEX_PARTITION);
+                }
+
+                processed.put(grpId, owning);
+            }
+
+            List<CacheConfiguration<?, ?>> ccfgs = new ArrayList<>();
+
+            for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+                int grpId = e.getKey();
+
+                CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+                if (gctx == null) {
+                    throw new IgniteCheckedException("Cache group context has not found " +
+                        "due to the cache group is stopped: " + grpId);
+                }
+
+                for (int partId : e.getValue()) {
+                    GroupPartitionId pair = new GroupPartitionId(grpId, partId);
+
+                    PageStore store = pageStore.getStore(grpId, partId);
+
+                    partDeltaWriters.put(pair,
+                        new PageStoreSerialWriter(store,
+                            partDeltaFile(cacheWorkDir(tmpConsIdDir, cacheDirName(gctx.config())), partId)));
+
+                    partFileLengths.put(pair, store.size());
+                }
+
+                ccfgs.add(gctx.config());
+            }
+
+            pageStore.readConfigurationFiles(ccfgs,
+                (ccfg, ccfgFile) -> ccfgSndrs.add(new CacheConfigurationSender(ccfg.getName(), cacheDirName(ccfg), ccfgFile)));
+        }
+        catch (IgniteCheckedException e) {
+            acceptException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) {
+        if (stopping())
+            return;
+
+        assert !processed.isEmpty() : "Partitions to process must be collected under checkpoint mark phase";
+
+        wrapExceptionIfStarted(() -> snpSndr.init(processed.values().stream().mapToInt(Set::size).sum()))
+            .run();
+
+        // Snapshot task can now be started since checkpoint write lock released and
+        // there is no error happen on task init.
+        if (!startedFut.onDone())
+            return;
+
+        // Submit all tasks for partitions and deltas processing.
+        List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+        if (log.isInfoEnabled())
+            log.info("Submit partition processing tasks with partition allocated lengths: " + partFileLengths);
+
+        Collection<BinaryType> binTypesCopy = cctx.kernalContext()
+            .cacheObjects()
+            .metadata(Collections.emptyList())
+            .values();
+
+        // Process binary meta.
+        futs.add(CompletableFuture.runAsync(
+            wrapExceptionIfStarted(() -> snpSndr.sendBinaryMeta(binTypesCopy)),
+            snpSndr.executor()));
+
+        List<Map<Integer, MappedName>> mappingsCopy = cctx.kernalContext()
+            .marshallerContext()
+            .getCachedMappings();
+
+        // Process marshaller meta.
+        futs.add(CompletableFuture.runAsync(
+            wrapExceptionIfStarted(() -> snpSndr.sendMarshallerMeta(mappingsCopy)),
+            snpSndr.executor()));
+
+        // Send configuration files of all cache groups.
+        for (CacheConfigurationSender ccfgSndr : ccfgSndrs)
+            futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor()));
+
+        for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+            int grpId = e.getKey();
+
+            CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
+
+            if (gctx == null) {
+                acceptException(new IgniteCheckedException("Cache group context has not found " +
+                    "due to the cache group is stopped: " + grpId));
+
+                break;
+            }
+
+            // Process partitions for a particular cache group.
+            for (int partId : e.getValue()) {
+                GroupPartitionId pair = new GroupPartitionId(grpId, partId);
+
+                CacheConfiguration<?, ?> ccfg = gctx.config();
+
+                assert ccfg != null : "Cache configuration cannot be empty on snapshot creation: " + pair;
+
+                String cacheDirName = cacheDirName(ccfg);
+                Long partLen = partFileLengths.get(pair);
+
+                CompletableFuture<Void> fut0 = CompletableFuture.runAsync(
+                    wrapExceptionIfStarted(() -> {
+                        snpSndr.sendPart(
+                            getPartitionFile(pageStore.workDir(), cacheDirName, partId),
+                            cacheDirName,
+                            pair,
+                            partLen);
+
+                        // Stop partition writer.
+                        partDeltaWriters.get(pair).markPartitionProcessed();
+                    }),
+                    snpSndr.executor())
+                    // Wait for the completion of both futures - checkpoint end, copy partition.
+                    .runAfterBothAsync(cpEndFut,
+                        wrapExceptionIfStarted(() -> {
+                            File delta = partDeltaWriters.get(pair).deltaFile;
+
+                            try {
+                                // Atomically creates a new, empty delta file if and only if
+                                // a file with this name does not yet exist.
+                                delta.createNewFile();
+                            }
+                            catch (IOException ex) {
+                                throw new IgniteCheckedException(ex);
+                            }
+
+                            snpSndr.sendDelta(delta, cacheDirName, pair);
+
+                            boolean deleted = delta.delete();
+
+                            assert deleted;
+                        }),
+                        snpSndr.executor());
+
+                futs.add(fut0);
+            }
+        }
+
+        int futsSize = futs.size();
+
+        CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]))
+            .whenComplete((res, t) -> {
+                assert t == null : "Exception must never be thrown since a wrapper is used " +
+                    "for each snapshot task: " + t;
+
+                closeAsync();
+            });
+    }
+
+    /**
+     * @param exec Runnable task to execute.
+     * @return Wrapped task.
+     */
+    private Runnable wrapExceptionIfStarted(IgniteThrowableRunner exec) {
+        return () -> {
+            if (stopping())
+                return;
+
+            try {
+                exec.run();
+            }
+            catch (Throwable t) {
+                acceptException(t);
+            }
+        };
+    }
+
+    /**
+     * @return Future which will be completed when operations truly stopped.
+     */
+    public synchronized CompletableFuture<Void> closeAsync() {
+        if (closeFut == null) {
+            Throwable err0 = err.get();
+
+            closeFut = CompletableFuture.runAsync(() -> onDone(true, err0),
+                cctx.kernalContext().getSystemExecutorService());
+        }
+
+        return closeFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel() {
+        acceptException(new IgniteCheckedException("Snapshot operation has been cancelled by external process " +
+            "[snpName=" + snpName + ']'));
+
+        try {
+            closeAsync().get();
+        }
+        catch (InterruptedException | ExecutionException e) {
+            U.error(log, "SnapshotFutureTask cancellation failed", e);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        SnapshotFutureTask ctx = (SnapshotFutureTask)o;
+
+        return snpName.equals(ctx.snpName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(snpName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotFutureTask.class, this);
+    }
+
+    /** */
+    private class CacheConfigurationSender implements BiConsumer<String, File>, Closeable {
+        /** Cache name associated with configuration file. */
+        private final String cacheName;
+
+        /** Cache directory associated with configuration file. */
+        private final String cacheDirName;
+
+        /** Lock for cache configuration processing. */
+        private final Lock lock = new ReentrantLock();
+
+        /** Configuration file to send. */
+        private volatile File ccfgFile;
+
+        /** {@code true} if configuration file already sent. */
+        private volatile boolean sent;
+
+        /**
+         * {@code true} if an old configuration file written to the temp directory and
+         * waiting to be sent.
+         */
+        private volatile boolean fromTemp;
+
+        /**
+         * @param ccfgFile Cache configuration to send.
+         * @param cacheDirName Cache directory.
+         */
+        public CacheConfigurationSender(String cacheName, String cacheDirName, File ccfgFile) {
+            this.cacheName = cacheName;
+            this.cacheDirName = cacheDirName;
+            this.ccfgFile = ccfgFile;
+
+            pageStore.addConfigurationChangeListener(this);
+        }
+
+        /**
+         * Send the original cache configuration file or the temp one instead saved due to
+         * concurrent configuration change operation happened (e.g. SQL add/drop column).
+         */
+        public void sendCacheConfig() {
+            lock.lock();
+
+            try {
+                snpSndr.sendCacheConfig(ccfgFile, cacheDirName);
+
+                close0();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void accept(String cacheName, File ccfgFile) {
+            assert ccfgFile.exists() :
+                "Cache configuration file must exist [cacheName=" + cacheName +
+                    ", ccfgFile=" + ccfgFile.getAbsolutePath() + ']';
+
+            if (stopping())
+                return;
+
+            if (!cacheName.equals(this.cacheName) || sent || fromTemp)
+                return;
+
+            lock.lock();
+
+            try {
+                if (sent || fromTemp)
+                    return;
+
+                File cacheWorkDir = cacheWorkDir(tmpSnpWorkDir, cacheDirName);
+
+                if (!U.mkdirs(cacheWorkDir))
+                    throw new IOException("Unable to create temp directory to copy original configuration file: " + cacheWorkDir);
+
+                File newCcfgFile = new File(cacheWorkDir, ccfgFile.getName());
+                newCcfgFile.createNewFile();
+
+                copy(ioFactory, ccfgFile, newCcfgFile, ccfgFile.length());
+
+                this.ccfgFile = newCcfgFile;
+                fromTemp = true;
+            }
+            catch (IOException e) {
+                acceptException(e);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /** Close writer and remove listener. */
+        private void close0() {
+            sent = true;
+            pageStore.removeConfigurationChangeListener(this);
+
+            if (fromTemp)
+                U.delete(ccfgFile);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            lock.lock();
+
+            try {
+                close0();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /** */
+    private class PageStoreSerialWriter implements PageWriteListener, Closeable {
+        /** Page store to which current writer is related to. */
+        @GridToStringExclude
+        private final PageStore store;
+
+        /** Partition delta file to store delta pages into. */
+        private final File deltaFile;
+
+        /** Busy lock to protect write operations. */
+        private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+        /** {@code true} if need the original page from PageStore instead of given buffer. */
+        @GridToStringExclude
+        private final BooleanSupplier checkpointComplete = () ->
+            cpEndFut.isDone() && !cpEndFut.isCompletedExceptionally();
+
+        /**
+         * Array of bits. 1 - means pages written, 0 - the otherwise.
+         * Size of array can be estimated only under checkpoint write lock.
+         */
+        private final AtomicBitSet writtenPages;
+
+        /** IO over the underlying delta file. */
+        @GridToStringExclude
+        private volatile FileIO deltaFileIo;
+
+        /** {@code true} if partition file has been copied to external resource. */
+        private volatile boolean partProcessed;
+
+        /**
+         * @param store Partition page store.
+         * @param deltaFile Destination file to write pages to.
+         */
+        public PageStoreSerialWriter(PageStore store, File deltaFile) {
+            assert store != null;
+            assert cctx.database().checkpointLockIsHeldByThread();
+
+            this.deltaFile = deltaFile;
+            this.store = store;
+            // It is important to init {@link AtomicBitSet} under the checkpoint write-lock.
+            // This guarantee us that no pages will be modified and it's safe to init pages
+            // list which needs to be processed.
+            writtenPages = new AtomicBitSet(store.pages());
+
+            store.addWriteListener(this);
+        }
+
+        /**
+         * @return {@code true} if writer is stopped and cannot write pages.
+         */
+        public boolean stopped() {
+            return (checkpointComplete.getAsBoolean() && partProcessed) || stopping();
+        }
+
+        /**
+         * Mark partition has been processed by another thread.
+         */
+        public void markPartitionProcessed() {
+            lock.writeLock().lock();
+
+            try {
+                partProcessed = true;
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void accept(long pageId, ByteBuffer buf) {
+            assert buf.position() == 0 : buf.position();
+            assert buf.order() == ByteOrder.nativeOrder() : buf.order();
+
+            if (deltaFileIo == null) {
+                lock.writeLock().lock();
+
+                try {
+                    if (stopped())
+                        return;
+
+                    if (deltaFileIo == null)
+                        deltaFileIo = ioFactory.create(deltaFile);
+                }
+                catch (IOException e) {
+                    acceptException(e);
+                }
+                finally {
+                    lock.writeLock().unlock();
+                }
+            }
+
+            int pageIdx = -1;
+
+            lock.readLock().lock();
+
+            try {
+                if (stopped())
+                    return;
+
+                pageIdx = PageIdUtils.pageIndex(pageId);
+
+                if (checkpointComplete.getAsBoolean()) {
+                    // Page already written.
+                    if (!writtenPages.touch(pageIdx))
+                        return;
+
+                    final ByteBuffer locBuf = locBuff.get();
+
+                    assert locBuf.capacity() == store.getPageSize();
+
+                    locBuf.clear();
+
+                    if (!store.read(pageId, locBuf, true))
+                        return;
+
+                    locBuf.flip();
+
+                    writePage0(pageId, locBuf);
+                }
+                else {
+                    // Direct buffer is needs to be written, associated checkpoint not finished yet.
+                    writePage0(pageId, buf);
+
+                    // Page marked as written to delta file, so there is no need to
+                    // copy it from file when the first checkpoint associated with
+                    // current snapshot task ends.
+                    writtenPages.touch(pageIdx);
+                }
+            }
+            catch (Throwable ex) {
+                acceptException(new IgniteCheckedException("Error during writing pages to delta partition file " +
+                    "[pageIdx=" + pageIdx + ", writer=" + this + ']', ex));
+            }
+            finally {
+                lock.readLock().unlock();
+            }
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param pageBuf Page buffer to write.
+         * @throws IOException If page writing failed (IO error occurred).
+         */
+        private void writePage0(long pageId, ByteBuffer pageBuf) throws IOException {
+            assert deltaFileIo != null : "Delta pages storage is not inited: " + this;
+            assert pageBuf.position() == 0;
+            assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer order " + pageBuf.order()
+                + " should be same with " + ByteOrder.nativeOrder();
+
+            if (log.isDebugEnabled()) {
+                log.debug("onPageWrite [pageId=" + pageId +
+                    ", pageIdBuff=" + PageIO.getPageId(pageBuf) +
+                    ", fileSize=" + deltaFileIo.size() +
+                    ", crcBuff=" + FastCrc.calcCrc(pageBuf, pageBuf.limit()) +
+                    ", crcPage=" + PageIO.getCrc(pageBuf) + ']');
+
+                pageBuf.rewind();
+            }
+
+            // Write buffer to the end of the file.
+            deltaFileIo.writeFully(pageBuf);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            lock.writeLock().lock();
+
+            try {
+                U.closeQuiet(deltaFileIo);
+
+                deltaFileIo = null;
+
+                store.removeWriteListener(this);
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PageStoreSerialWriter.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AtomicBitSet {
+        /** Container of bits. */
+        private final AtomicIntegerArray arr;
+
+        /** Size of array of bits. */
+        private final int size;
+
+        /**
+         * @param size Size of array.
+         */
+        public AtomicBitSet(int size) {
+            this.size = size;
+
+            arr = new AtomicIntegerArray((size + 31) >>> 5);
+        }
+
+        /**
+         * @param off Bit position to change.
+         * @return {@code true} if bit has been set,
+         * {@code false} if bit changed by another thread or out of range.
+         */
+        public boolean touch(long off) {
+            if (off >= size)
+                return false;
+
+            int bit = 1 << off;
+            int bucket = (int)(off >>> 5);
+
+            while (true) {
+                int cur = arr.get(bucket);
+                int val = cur | bit;
+
+                if (cur == val)
+                    return false;
+
+                if (arr.compareAndSet(bucket, cur, val))
+                    return true;
+            }
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
new file mode 100644
index 0000000..e6b6a72
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import org.apache.ignite.internal.GridKernalContextImpl;
+import org.apache.ignite.mxbean.SnapshotMXBean;
+
+/**
+ * Snapshot MBean features.
+ */
+public class SnapshotMXBeanImpl implements SnapshotMXBean {
+    /** Instance of snapshot cache shared manager. */
+    private final IgniteSnapshotManager mgr;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public SnapshotMXBeanImpl(GridKernalContextImpl ctx) {
+        mgr = ctx.cache().context().snapshotMgr();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void createSnapshot(String snpName) {
+        mgr.createSnapshot(snpName);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java
new file mode 100644
index 0000000..c48f899
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+abstract class SnapshotSender {
+    /** Busy processing lock. */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /** Executor to run operation at. */
+    private final Executor exec;
+
+    /** {@code true} if sender is currently working. */
+    private volatile boolean closed;
+
+    /** Ignite logger to use. */
+    protected final IgniteLogger log;
+
+    /**
+     * @param log Ignite logger to use.
+     */
+    protected SnapshotSender(IgniteLogger log, Executor exec) {
+        this.exec = exec;
+        this.log = log.getLogger(SnapshotSender.class);
+    }
+
+    /**
+     * @return Executor to run internal operations on.
+     */
+    public Executor executor() {
+        return exec;
+    }
+
+    /**
+     * @param mappings Local node marshaller mappings.
+     */
+    public final void sendMarshallerMeta(List<Map<Integer, MappedName>> mappings) {
+        if (!lock.readLock().tryLock())
+            return;
+
+        try {
+            if (closed)
+                return;
+
+            if (mappings == null)
+                return;
+
+            sendMarshallerMeta0(mappings);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param types Collection of known binary types.
+     */
+    public final void sendBinaryMeta(Collection<BinaryType> types) {
+        if (!lock.readLock().tryLock())
+            return;
+
+        try {
+            if (closed)
+                return;
+
+            if (types == null)
+                return;
+
+            sendBinaryMeta0(types);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param ccfg Cache configuration file.
+     * @param cacheDirName Cache group directory name.
+     */
+    public final void sendCacheConfig(File ccfg, String cacheDirName) {
+        if (!lock.readLock().tryLock())
+            return;
+
+        try {
+            if (closed)
+                return;
+
+            sendCacheConfig0(ccfg, cacheDirName);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param part Partition file to send.
+     * @param cacheDirName Cache group directory name.
+     * @param pair Group id with partition id pair.
+     * @param length Partition length.
+     */
+    public final void sendPart(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+        if (!lock.readLock().tryLock())
+            return;
+
+        try {
+            if (closed)
+                return;
+
+            sendPart0(part, cacheDirName, pair, length);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param delta Delta pages file.
+     * @param cacheDirName Cache group directory name.
+     * @param pair Group id with partition id pair.
+     */
+    public final void sendDelta(File delta, String cacheDirName, GroupPartitionId pair) {
+        if (!lock.readLock().tryLock())
+            return;
+
+        try {
+            if (closed)
+                return;
+
+            sendDelta0(delta, cacheDirName, pair);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Closes this snapshot sender and releases any resources associated with it.
+     * If the sender is already closed then invoking this method has no effect.
+     *
+     * @param th An exception occurred during snapshot operation processing.
+     */
+    public final void close(@Nullable Throwable th) {
+        lock.writeLock().lock();
+
+        try {
+            close0(th);
+
+            closed = true;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param partsCnt Number of objects to process.
+     */
+    protected abstract void init(int partsCnt);
+
+    /**
+     * @param part Partition file to send.
+     * @param cacheDirName Cache group directory name.
+     * @param pair Group id with partition id pair.
+     * @param length Partition length.
+     */
+    protected abstract void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length);
+
+    /**
+     * @param delta Delta pages file.
+     * @param cacheDirName Cache group directory name.
+     * @param pair Group id with partition id pair.
+     */
+    protected abstract void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair);
+
+    /**
+     * @param mappings Local node marshaller mappings.
+     */
+    protected void sendMarshallerMeta0(List<Map<Integer, MappedName>> mappings) {
+        // No-op by default.
+    }
+
+    /**
+     * @param types Collection of known binary types.
+     */
+    protected void sendBinaryMeta0(Collection<BinaryType> types) {
+        // No-op by default.
+    }
+
+    /**
+     * @param ccfg Cache configuration file.
+     * @param cacheDirName Cache group directory name.
+     */
+    protected void sendCacheConfig0(File ccfg, String cacheDirName) {
+        // No-op by default.
+    }
+
+    /**
+     * Closes this snapshot sender and releases any resources associated with it.
+     * If the sender is already closed then invoking this method has no effect.
+     */
+    protected void close0(@Nullable Throwable th) {
+        // No-op by default.
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 63c282f..cb5bfc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -376,7 +376,7 @@ public class IgniteWalIteratorFactory {
 
         return new GridCacheSharedContext<>(
             kernalCtx, null, null, null,
-            null, null, null, dbMgr, null,
+            null, null, null, dbMgr, null, null,
             null, null, null, null, null,
             null, null, null, null, null, null
         );
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 88203c5..7d0bee1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -106,13 +106,12 @@ import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
+
 /**
  * Dummy grid kernal context
  */
 public class StandaloneGridKernalContext implements GridKernalContext {
-    /** Binary metadata file store folder. */
-    public static final String BINARY_META_FOLDER = "binary_meta";
-
     /** Config for fake Ignite instance. */
     private final IgniteConfiguration cfg;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index c029705..62e15df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.internal.processors.cacheobject;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
-
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -301,6 +301,12 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
     public Collection<BinaryType> metadata() throws IgniteException;
 
     /**
+     * @param types Collection of binary types to write to.
+     * @param dir Destination directory.
+     */
+    public void saveMetadata(Collection<BinaryType> types, File dir);
+
+    /**
      * @param typeName Type name.
      * @param ord ordinal.
      * @return Enum object.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index dad0b44..663a3f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -347,20 +347,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
      * @param mappings Incoming marshaller mappings.
      */
     private void processIncomingMappings(List<Map<Integer, MappedName>> mappings) {
-        if (mappings != null) {
-            for (int i = 0; i < mappings.size(); i++) {
-                Map<Integer, MappedName> map;
-
-                if ((map = mappings.get(i)) != null) {
-                    try {
-                        marshallerCtx.onMappingDataReceived((byte)i, map);
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to process marshaller mapping data", e);
-                    }
-                }
-            }
-        }
+        marshallerCtx.onMappingDataReceived(log, mappings);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index cf05916..8d608f1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -367,8 +367,8 @@ public abstract class IgniteUtils {
     /** Thread dump message. */
     public static final String THREAD_DUMP_MSG = "Thread dump at ";
 
-    /** Correct Mbean cache name pattern. */
-    private static Pattern MBEAN_CACHE_NAME_PATTERN = Pattern.compile("^[a-zA-Z_0-9]+$");
+    /** Alphanumeric with underscore regexp pattern. */
+    private static final Pattern ALPHANUMERIC_UNDERSCORE_PATTERN = Pattern.compile("^[a-zA-Z_0-9]+$");
 
     /** Project home directory. */
     private static volatile GridTuple<String> ggHome;
@@ -4793,13 +4793,21 @@ public abstract class IgniteUtils {
      * @return An escaped string.
      */
     private static String escapeObjectNameValue(String s) {
-        if (MBEAN_CACHE_NAME_PATTERN.matcher(s).matches())
+        if (alphanumericUnderscore(s))
             return s;
 
         return '\"' + s.replaceAll("[\\\\\"?*]", "\\\\$0") + '\"';
     }
 
     /**
+     * @param s String to check.
+     * @return {@code true} if given string contains only alphanumeric and underscore symbols.
+     */
+    public static boolean alphanumericUnderscore(String s) {
+        return ALPHANUMERIC_UNDERSCORE_PATTERN.matcher(s).matches();
+    }
+
+    /**
      * Registers MBean with the server.
      *
      * @param <T> Type of mbean.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 3bc719d..93f4725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -33,6 +34,7 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CI3;
@@ -80,17 +82,41 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
     /** Logger. */
     private final IgniteLogger log;
 
+    /** Factory which creates custom {@link InitMessage} for distributed process initialization. */
+    private BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory;
+
+    /**
+     * @param ctx Kernal context.
+     * @param type Process type.
+     * @param exec Execute action and returns future with the single node result to send to the coordinator.
+     * @param finish Finish process closure. Called on each node when all single nodes results received.
+     */
+    public DistributedProcess(
+        GridKernalContext ctx,
+        DistributedProcessType type,
+        Function<I, IgniteInternalFuture<R>> exec,
+        CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish
+    ) {
+        this(ctx, type, exec, finish, (id, req) -> new InitMessage<>(id, type, req));
+    }
+
     /**
      * @param ctx Kernal context.
      * @param type Process type.
      * @param exec Execute action and returns future with the single node result to send to the coordinator.
      * @param finish Finish process closure. Called on each node when all single nodes results received.
+     * @param initMsgFactory Factory which creates custom {@link InitMessage} for distributed process initialization.
      */
-    public DistributedProcess(GridKernalContext ctx, DistributedProcessType type,
+    public DistributedProcess(
+        GridKernalContext ctx,
+        DistributedProcessType type,
         Function<I, IgniteInternalFuture<R>> exec,
-        CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish) {
+        CI3<UUID, Map<UUID, R>, Map<UUID, Exception>> finish,
+        BiFunction<UUID, I, ? extends InitMessage<I>> initMsgFactory
+    ) {
         this.ctx = ctx;
         this.type = type;
+        this.initMsgFactory = initMsgFactory;
 
         log = ctx.log(getClass());
 
@@ -218,9 +244,7 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
      */
     public void start(UUID id, I req) {
         try {
-            InitMessage<I> msg = new InitMessage<>(id, type, req);
-
-            ctx.discovery().sendCustomEvent(msg);
+            ctx.discovery().sendCustomEvent(initMsgFactory.apply(id, req));
         }
         catch (IgniteCheckedException e) {
             log.warning("Unable to start process.", e);
@@ -379,6 +403,20 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
          *
          * @see GridEncryptionManager
          */
-        MASTER_KEY_CHANGE_FINISH
+        MASTER_KEY_CHANGE_FINISH,
+
+        /**
+         * Start snapshot procedure.
+         *
+         * @see IgniteSnapshotManager
+         */
+        START_SNAPSHOT,
+
+        /**
+         * End snapshot procedure.
+         *
+         * @see IgniteSnapshotManager
+         */
+        END_SNAPSHOT
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
index e23ba36..4e43f52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
@@ -81,7 +81,7 @@ public class InitMessage<I extends Serializable> implements DiscoveryCustomMessa
     /** {@inheritDoc} */
     @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer,
         DiscoCache discoCache) {
-        return null;
+        return mgr.createDiscoCacheOnCacheChange(topVer, discoCache);
     }
 
     /** @return Process id. */
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
new file mode 100644
index 0000000..22816d2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+import org.apache.ignite.IgniteSnapshot;
+
+/**
+ * Snapshot features MBean.
+ */
+@MXBeanDescription("MBean that provides access for snapshot features.")
+public interface SnapshotMXBean {
+    /**
+     * Create the cluster-wide snapshot with given name asynchronously.
+     *
+     * @param snpName Snapshot name to created.
+     * @see IgniteSnapshot#createSnapshot(String) (String)
+     */
+    @MXBeanDescription("Create cluster-wide snapshot.")
+    public void createSnapshot(@MXBeanParameter(name = "snpName", description = "Snapshot name.") String snpName);
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 8af5747..3123911 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -404,7 +404,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     /**
      *
      */
-    protected void checkPartitionMapExchangeFinished() {
+    public static void checkPartitionMapExchangeFinished() {
         for (Ignite g : G.allGrids()) {
             IgniteKernal g0 = (IgniteKernal)g;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
index 7045d98..2289d4e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
@@ -58,6 +58,7 @@ import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
 import static org.apache.ignite.testframework.GridTestUtils.suppressException;
 
 /**
@@ -546,7 +547,7 @@ public class IgnitePdsBinaryMetadataAsyncWritingTest extends GridCommonAbstractT
      */
     private void cleanBinaryMetaFolderForNode(String consId) throws IgniteCheckedException {
         String dfltWorkDir = U.defaultWorkDirectory();
-        File metaDir = U.resolveWorkDirectory(dfltWorkDir, "binary_meta", false);
+        File metaDir = U.resolveWorkDirectory(dfltWorkDir, BINARY_META_FOLDER, false);
 
         for (File subDir : metaDir.listFiles()) {
             if (subDir.getName().contains(consId)) {
@@ -661,7 +662,7 @@ public class IgnitePdsBinaryMetadataAsyncWritingTest extends GridCommonAbstractT
 
     /** */
     private static boolean isBinaryMetaFile(File file) {
-        return file.getPath().contains("binary_meta");
+        return file.getPath().contains(BINARY_META_FOLDER);
     }
 
     /** */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java
index 69601b9..47b23a09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataOnClusterRestartTest.java
@@ -45,6 +45,8 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
+
 /**
  *
  */
@@ -364,8 +366,8 @@ public class IgnitePdsBinaryMetadataOnClusterRestartTest extends GridCommonAbstr
     ) throws Exception {
         String workDir = U.defaultWorkDirectory();
 
-        Path fromFile = Paths.get(workDir, fromWorkDir, "binary_meta", fromConsId, fileName);
-        Path toFile = Paths.get(workDir, toWorkDir, "binary_meta", toConsId, fileName);
+        Path fromFile = Paths.get(workDir, fromWorkDir, BINARY_META_FOLDER, fromConsId, fileName);
+        Path toFile = Paths.get(workDir, toWorkDir, BINARY_META_FOLDER, toConsId, fileName);
 
         Files.copy(fromFile, toFile, StandardCopyOption.REPLACE_EXISTING);
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java
index 0e01409..fd8b797 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsNoSpaceLeftOnDeviceTest.java
@@ -31,11 +31,12 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
-import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
+
 /**
  *
  */
@@ -133,7 +134,7 @@ public class IgnitePdsNoSpaceLeftOnDeviceTest extends GridCommonAbstractTest {
         @Override public FileIO create(File file, OpenOption... modes) throws IOException {
             if (unluckyConsistentId.get() != null
                 && file.getAbsolutePath().contains(unluckyConsistentId.get())
-                && file.getAbsolutePath().contains(StandaloneGridKernalContext.BINARY_META_FOLDER))
+                && file.getAbsolutePath().contains(BINARY_META_FOLDER))
                 throw new IOException("No space left on device");
 
             return delegateFactory.create(file, modes);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
index 230cb5c..9ff5914 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorSwitchSegmentTest.java
@@ -150,6 +150,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
                 null,
                 null,
                 null,
+                null,
                 null)
         ).createSerializer(serVer);
 
@@ -470,6 +471,7 @@ public class IgniteWalIteratorSwitchSegmentTest extends GridCommonAbstractTest {
             null,
             null,
             null,
+            null,
             new GridCacheIoManager(),
             null,
             null,
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 5549d69..f530e80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -95,6 +95,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest {
             null,
             null,
             null,
+            null,
             new CacheDiagnosticManager()
         );
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 9125e96..106b9ec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -94,6 +94,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest
             null,
             null,
             null,
+            null,
             null
         );
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
index 0234196..c575f6d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
@@ -110,6 +110,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest {
             null,
             null,
             null,
+            null,
             new CacheDiagnosticManager()
         );
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index 5abccf4..5105489 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -100,6 +100,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
             null,
             null,
             null,
+            null,
             null
         );
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 595eac0..4be75b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -620,6 +620,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
             null,
             null,
             null,
+            null,
             null
         );
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
new file mode 100644
index 0000000..2c59eb8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+
+import static java.nio.file.Files.newDirectoryStream;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_SNAPSHOT_TMP_DIR;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/**
+ * Base snapshot tests.
+ */
+public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
+    /** Default snapshot name. */
+    protected static final String SNAPSHOT_NAME = "testSnapshot";
+
+    /** Default number of partitions for cache. */
+    protected static final int CACHE_PARTS_COUNT = 8;
+
+    /** Number of cache keys to pre-create at node start. */
+    protected static final int CACHE_KEYS_RANGE = 1024;
+
+    /** Configuration for the 'default' cache. */
+    protected volatile CacheConfiguration<Integer, Integer> dfltCacheCfg;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi discoSpi = new BlockingCustomMessageDiscoverySpi();
+
+        discoSpi.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder());
+
+        return cfg.setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setMaxSize(100L * 1024 * 1024)
+                    .setPersistenceEnabled(true))
+                .setCheckpointFrequency(3000)
+                .setPageSize(4096))
+            .setCacheConfiguration(dfltCacheCfg)
+            .setClusterStateOnStart(INACTIVE)
+            // Default work directory must be resolved earlier if snapshot used to start grids.
+            .setWorkDirectory(U.defaultWorkDirectory())
+            .setDiscoverySpi(discoSpi);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cleanPersistenceDir() throws Exception {
+        super.cleanPersistenceDir();
+
+        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_SNAPSHOT_DIRECTORY, false));
+    }
+
+    /** @throws Exception If fails. */
+    @Before
+    public void beforeTestSnapshot() throws Exception {
+        cleanPersistenceDir();
+
+        dfltCacheCfg = txCacheConfig(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+    }
+
+    /** @throws Exception If fails. */
+    @After
+    public void afterTestSnapshot() throws Exception {
+        try {
+            for (Ignite ig : G.allGrids()) {
+                if (ig.configuration().isClientMode())
+                    continue;
+
+                File storeWorkDir = ((FilePageStoreManager)((IgniteEx)ig).context()
+                    .cache().context().pageStore()).workDir();
+
+                Path snpTempDir = Paths.get(storeWorkDir.getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR);
+
+                assertEquals("Snapshot working directory must be empty at the moment test execution stopped: " + snpTempDir,
+                    0, U.fileCount(snpTempDir));
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @param ccfg Default cache configuration.
+     * @return Cache configuration.
+     */
+    protected static <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
+        return ccfg.setCacheMode(CacheMode.PARTITIONED)
+            .setBackups(2)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setAffinity(new RendezvousAffinityFunction(false, CACHE_PARTS_COUNT));
+    }
+
+    /**
+     * Calculate CRC for all partition files of specified cache.
+     *
+     * @param cacheDir Cache directory to iterate over partition files.
+     * @return The map of [fileName, checksum].
+     */
+    public static Map<String, Integer> calculateCRC32Partitions(File cacheDir) {
+        assert cacheDir.isDirectory() : cacheDir.getAbsolutePath();
+
+        Map<String, Integer> result = new HashMap<>();
+
+        try {
+            try (DirectoryStream<Path> partFiles = newDirectoryStream(cacheDir.toPath(),
+                p -> p.toFile().getName().startsWith(PART_FILE_PREFIX) && p.toFile().getName().endsWith(FILE_SUFFIX))
+            ) {
+                for (Path path : partFiles)
+                    result.put(path.toFile().getName(), FastCrc.calcCrc(path.toFile()));
+            }
+
+            return result;
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param path Directory to search.
+     * @param dir Directory name.
+     * @return Result.
+     * @throws IOException If fails.
+     */
+    public static Optional<Path> searchDirectoryRecursively(Path path, String dir) throws IOException {
+        if (Files.notExists(path))
+            return Optional.empty();
+
+        return Files.walk(path)
+            .filter(Files::isDirectory)
+            .filter(file -> dir.equals(file.getFileName().toString()))
+            .findAny();
+    }
+
+    /**
+     * @param ccfg Default cache configuration.
+     * @return Ignite instance.
+     * @throws Exception If fails.
+     */
+    protected IgniteEx startGridWithCache(CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+        return startGridsWithCache(1, ccfg, keys);
+    }
+
+    /**
+     * @param grids Number of grids to start.
+     * @param ccfg Default cache configuration.
+     * @param keys Range of cache keys to insert.
+     * @return Ignite instance.
+     * @throws Exception If fails.
+     */
+    protected IgniteEx startGridsWithCache(int grids, CacheConfiguration<Integer, Integer> ccfg, int keys) throws Exception {
+        dfltCacheCfg = ccfg;
+
+        return startGridsWithCache(grids, keys, Integer::new, ccfg);
+    }
+
+    /**
+     * @param grids Number of ignite instances to start.
+     * @param keys Number of keys to create.
+     * @param factory Factory which produces values.
+     * @param <V> Cache value type.
+     * @return Ignite coordinator instance.
+     * @throws Exception If fails.
+     */
+    protected <V> IgniteEx startGridsWithCache(
+        int grids,
+        int keys,
+        Function<Integer, V> factory,
+        CacheConfiguration<Integer, V>... ccfgs
+    ) throws Exception {
+        for (int g = 0; g < grids; g++)
+            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(g))
+                .setCacheConfiguration(ccfgs)));
+
+        IgniteEx ig = grid(0);
+
+        ig.cluster().baselineAutoAdjustEnabled(false);
+        ig.cluster().state(ClusterState.ACTIVE);
+
+        for (int i = 0; i < keys; i++) {
+            for (CacheConfiguration<Integer, V> ccfg : ccfgs)
+                ig.getOrCreateCache(ccfg.getName()).put(i, factory.apply(i));
+        }
+
+        forceCheckpoint();
+
+        return ig;
+    }
+
+    /**
+     * @param grids Number of ignite instances.
+     * @return Coordinator ignite instance.
+     * @throws Exception If fails.
+     */
+    protected IgniteEx startGridsWithoutCache(int grids) throws Exception {
+        for (int i = 0; i < grids; i++)
+            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(i)).setCacheConfiguration()));
+
+        IgniteEx ignite = grid(0);
+
+        ignite.cluster().baselineAutoAdjustEnabled(false);
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        return ignite;
+    }
+
+    /**
+     * @param cnt Number of grids to start.
+     * @param snpName Snapshot to start grids from.
+     * @return Coordinator ignite instance.
+     * @throws Exception If fails.
+     */
+    protected IgniteEx startGridsFromSnapshot(int cnt, String snpName) throws Exception {
+        return startGridsFromSnapshot(cnt, cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), snpName, true);
+    }
+
+    /**
+     * @param cnt Number of grids to start.
+     * @param path Snapshot path resolver.
+     * @param snpName Snapshot to start grids from.
+     * @return Coordinator ignite instance.
+     * @throws Exception If fails.
+     */
+    protected IgniteEx startGridsFromSnapshot(int cnt,
+        Function<IgniteConfiguration, String> path,
+        String snpName,
+        boolean activate
+    ) throws Exception {
+        IgniteEx crd = null;
+
+        for (int i = 0; i < cnt; i++) {
+            IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(i)));
+
+            cfg.setWorkDirectory(Paths.get(path.apply(cfg), snpName).toString());
+
+            if (crd == null)
+                crd = startGrid(cfg);
+            else
+                startGrid(cfg);
+        }
+
+        crd.cluster().baselineAutoAdjustEnabled(false);
+
+        if (activate)
+            crd.cluster().state(ACTIVE);
+
+        return crd;
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @return Snapshot manager related to given ignite instance.
+     */
+    public static IgniteSnapshotManager snp(IgniteEx ignite) {
+        return ignite.context().cache().context().snapshotMgr();
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @return Directory name for ignite instance.
+     * @throws IgniteCheckedException If fails.
+     */
+    public static String folderName(IgniteEx ignite) throws IgniteCheckedException {
+        return ignite.context().pdsFolderResolver().resolveFolders().folderName();
+    }
+
+    /**
+     * @param cache Ignite cache to check.
+     */
+    protected static void assertSnapshotCacheKeys(IgniteCache<?, ?> cache) {
+        List<Integer> keys = IntStream.range(0, CACHE_KEYS_RANGE).boxed().collect(Collectors.toList());
+
+        cache.query(new ScanQuery<>(null))
+            .forEach(e -> keys.remove((Integer)e.getKey()));
+
+        assertTrue("Snapshot must contains pre-created cache data " +
+            "[cache=" + cache.getName() + ", keysLeft=" + keys + ']', keys.isEmpty());
+    }
+
+    /**
+     * @param ignite Ignite instance to resolve discovery spi to.
+     * @return BlockingCustomMessageDiscoverySpi instance.
+     */
+    protected static BlockingCustomMessageDiscoverySpi discoSpi(IgniteEx ignite) {
+        return (BlockingCustomMessageDiscoverySpi)ignite.context().discovery().getInjectedDiscoverySpi();
+    }
+
+    /** */
+    protected static class BlockingCustomMessageDiscoverySpi extends TcpDiscoverySpi {
+        /** List of messages which have been blocked. */
+        private final List<DiscoverySpiCustomMessage> blocked = new CopyOnWriteArrayList<>();
+
+        /** Discovery custom message filter. */
+        private volatile IgnitePredicate<DiscoveryCustomMessage> blockPred;
+
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
+            if (msg instanceof CustomMessageWrapper) {
+                DiscoveryCustomMessage msg0 = ((CustomMessageWrapper)msg).delegate();
+
+                if (blockPred != null && blockPred.apply(msg0)) {
+                    blocked.add(msg);
+
+                    if (log.isInfoEnabled())
+                        log.info("Discovery message has been blocked: " + msg0);
+
+                    return;
+                }
+            }
+
+            super.sendCustomEvent(msg);
+        }
+
+        /** Start blocking discovery custom messages. */
+        public synchronized void block(IgnitePredicate<DiscoveryCustomMessage> pred) {
+            blockPred = pred;
+        }
+
+        /** Unblock and send previously saved discovery custom messages */
+        public synchronized void unblock() {
+            blockPred = null;
+
+            for (DiscoverySpiCustomMessage msg : blocked)
+                sendCustomEvent(msg);
+
+            blocked.clear();
+        }
+
+        /**
+         * @param timeout Timeout to wait blocking messages.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        public void waitBlocked(long timeout) throws IgniteInterruptedCheckedException {
+            GridTestUtils.waitForCondition(() -> !blocked.isEmpty(), timeout);
+        }
+    }
+
+    /** */
+    protected static class DelegateSnapshotSender extends SnapshotSender {
+        /** Delegate call to. */
+        protected final SnapshotSender delegate;
+
+        /**
+         * @param delegate Delegate call to.
+         */
+        public DelegateSnapshotSender(IgniteLogger log, Executor exec, SnapshotSender delegate) {
+            super(log, exec);
+
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void init(int partsCnt) {
+            delegate.init(partsCnt);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendCacheConfig0(File ccfg, String cacheDirName) {
+            delegate.sendCacheConfig(ccfg, cacheDirName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMarshallerMeta0(List<Map<Integer, MappedName>> mappings) {
+            delegate.sendMarshallerMeta(mappings);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendBinaryMeta0(Collection<BinaryType> types) {
+            delegate.sendBinaryMeta(types);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+            delegate.sendPart(part, cacheDirName, pair, length);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+            delegate.sendDelta(delta, cacheDirName, pair);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close0(Throwable th) {
+            delegate.close(th);
+        }
+    }
+
+    /** Account item. */
+    protected static class Account implements Serializable {
+        /** Serial version. */
+        private static final long serialVersionUID = 0L;
+
+        /** User id. */
+        @QuerySqlField(index = true)
+        private final int id;
+
+        /** Order value. */
+        @QuerySqlField
+        protected int balance;
+
+        /**
+         * @param id User id.
+         * @param balance User balance.
+         */
+        public Account(int id, int balance) {
+            this.id = id;
+            this.balance = balance;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Account item = (Account)o;
+
+            return id == item.id &&
+                balance == item.balance;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(id, balance);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
new file mode 100644
index 0000000..7fe818a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -0,0 +1,945 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.ObjectGauge;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.FullMessage;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
+import static org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest.checkPartitionMapExchangeFinished;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_IN_PROGRESS_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_NODE_STOPPING_ERR_MSG;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.isSnapshotOperation;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Cluster-wide snapshot test.
+ */
+public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
+    /** Time to wait while rebalance may happen. */
+    private static final long REBALANCE_AWAIT_TIME = GridTestUtils.SF.applyLB(10_000, 3_000);
+
+    /** Cache configuration for test. */
+    private static CacheConfiguration<Integer, Integer> atomicCcfg = new CacheConfiguration<Integer, Integer>("atomicCacheName")
+        .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+        .setBackups(2)
+        .setAffinity(new RendezvousAffinityFunction(false, CACHE_PARTS_COUNT));
+
+    /** {@code true} if node should be started in separate jvm. */
+    protected volatile boolean jvm;
+
+    /** @throws Exception If fails. */
+    @Before
+    @Override public void beforeTestSnapshot() throws Exception {
+        super.beforeTestSnapshot();
+
+        jvm = false;
+    }
+
+    /**
+     * Take snapshot from the whole cluster and check snapshot consistency when the
+     * cluster tx load starts on a new topology version.
+     * Note: Client nodes and server nodes not in baseline topology must not be affected.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testConsistentClusterSnapshotLoadNewTopology() throws Exception {
+        int grids = 3;
+        String snpName = "backup23012020";
+        AtomicInteger atKey = new AtomicInteger(CACHE_KEYS_RANGE);
+        AtomicInteger txKey = new AtomicInteger(CACHE_KEYS_RANGE);
+
+        IgniteEx ignite = startGrids(grids);
+        startClientGrid();
+
+        ignite.cluster().baselineAutoAdjustEnabled(false);
+        ignite.cluster().state(ACTIVE);
+
+        // Start node not in baseline.
+        IgniteEx notBltIgnite = startGrid(grids);
+        File locSnpDir = snp(notBltIgnite).snapshotLocalDir(SNAPSHOT_NAME);
+        String notBltDirName = folderName(notBltIgnite);
+
+        IgniteCache<Integer, Integer> atCache = ignite.createCache(atomicCcfg);
+
+        for (int idx = 0; idx < CACHE_KEYS_RANGE; idx++) {
+            atCache.put(atKey.incrementAndGet(), -1);
+            ignite.cache(DEFAULT_CACHE_NAME).put(txKey.incrementAndGet(), -1);
+        }
+
+        forceCheckpoint();
+
+        CountDownLatch loadLatch = new CountDownLatch(1);
+
+        ignite.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
+            /** {@inheritDoc} */
+            @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                // First discovery custom event will be a snapshot operation.
+                assertTrue(isSnapshotOperation(fut.firstEvent()));
+                assertTrue("Snapshot must use pme-free exchange", fut.context().exchangeFreeSwitch());
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                if (fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
+                    return;
+
+                DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)fut.firstEvent()).customMessage();
+
+                assertNotNull(msg);
+
+                if (msg instanceof SnapshotDiscoveryMessage)
+                    loadLatch.countDown();
+            }
+        });
+
+        // Start cache load.
+        IgniteInternalFuture<Long> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                U.await(loadLatch);
+
+                while (!Thread.currentThread().isInterrupted()) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int atIdx = rnd.nextInt(grids);
+
+                    // Zero out the sign bit.
+                    grid(atIdx).cache(atomicCcfg.getName()).put(txKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+
+                    int txIdx = rnd.nextInt(grids);
+
+                    grid(txIdx).cache(DEFAULT_CACHE_NAME).put(atKey.incrementAndGet(), rnd.nextInt() & Integer.MAX_VALUE);
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new RuntimeException(e);
+            }
+        }, 3, "cache-put-");
+
+        try {
+            IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(snpName);
+
+            U.await(loadLatch, 10, TimeUnit.SECONDS);
+
+            fut.get();
+        }
+        finally {
+            loadFut.cancel();
+        }
+
+        // Cluster can be deactivated but we must test snapshot restore when binary recovery also occurred.
+        stopAllGrids();
+
+        assertTrue("Snapshot directory must be empty for node not in baseline topology: " + notBltDirName,
+            !searchDirectoryRecursively(locSnpDir.toPath(), notBltDirName).isPresent());
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, snpName);
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + DEFAULT_CACHE_NAME,
+            CACHE_KEYS_RANGE, snpIg0.cache(DEFAULT_CACHE_NAME).size());
+
+        assertEquals("The number of all (primary + backup) cache keys mismatch for cache: " + atomicCcfg.getName(),
+            CACHE_KEYS_RANGE, snpIg0.cache(atomicCcfg.getName()).size());
+
+        snpIg0.cache(DEFAULT_CACHE_NAME).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + DEFAULT_CACHE_NAME + ", entry=" + e + ']', (Integer)e.getValue() < 0));
+
+        snpIg0.cache(atomicCcfg.getName()).query(new ScanQuery<>(null))
+            .forEach(e -> assertTrue("Snapshot must contains only negative values " +
+                "[cache=" + atomicCcfg.getName() + ", entry=" + e + ']', (Integer)e.getValue() < 0));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotPrimaryBackupsTheSame() throws Exception {
+        int grids = 3;
+        AtomicInteger cacheKey = new AtomicInteger();
+
+        IgniteEx ignite = startGridsWithCache(grids, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        IgniteInternalFuture<Long> atLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int gId = rnd.nextInt(grids);
+
+                IgniteCache<Integer, Integer> txCache = grid(gId).getOrCreateCache(dfltCacheCfg.getName());
+
+                try (Transaction tx = grid(gId).transactions().txStart()) {
+                    txCache.put(cacheKey.incrementAndGet(), 0);
+
+                    txCache.put(cacheKey.incrementAndGet(), 1);
+
+                    tx.commit();
+                }
+            }
+        }, 5, "tx-cache-put-");
+
+        IgniteInternalFuture<Long> txLoadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            while (!Thread.currentThread().isInterrupted()) {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                IgniteCache<Integer, Integer> atomicCache = grid(rnd.nextInt(grids))
+                    .getOrCreateCache(atomicCcfg);
+
+                atomicCache.put(cacheKey.incrementAndGet(), 0);
+            }
+        }, 5, "atomic-cache-put-");
+
+        try {
+            IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+            fut.get();
+        }
+        finally {
+            txLoadFut.cancel();
+            atLoadFut.cancel();
+        }
+
+        stopAllGrids();
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, cfg -> resolveSnapshotWorkDirectory(cfg).getAbsolutePath(), SNAPSHOT_NAME, false);
+
+        // Block whole rebalancing.
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).blockMessages((node, msg) -> msg instanceof GridDhtPartitionDemandMessage);
+
+        snpIg0.cluster().state(ACTIVE);
+
+        assertFalse("Primary and backup in snapshot must have the same counters. Rebalance must not happen.",
+            GridTestUtils.waitForCondition(() -> {
+                boolean hasMsgs = false;
+
+                for (Ignite g : G.allGrids())
+                    hasMsgs |= TestRecordingCommunicationSpi.spi(g).hasBlockedMessages();
+
+                return hasMsgs;
+            }, REBALANCE_AWAIT_TIME));
+
+        TestRecordingCommunicationSpi.stopBlockAll();
+
+        assertPartitionsSame(idleVerify(snpIg0, dfltCacheCfg.getName(), atomicCcfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotConsistencyUnderLoad() throws Exception {
+        int clientsCnt = 50;
+        int balance = 10_000;
+        int transferLimit = 1000;
+        int total = clientsCnt * balance * 2;
+        int grids = 3;
+        int transferThreadCnt = 4;
+        AtomicBoolean stop = new AtomicBoolean(false);
+        CountDownLatch txStarted = new CountDownLatch(1);
+
+        CacheConfiguration<Integer, Account> eastCcfg = txCacheConfig(new CacheConfiguration<>("east"));
+        CacheConfiguration<Integer, Account> westCcfg = txCacheConfig(new CacheConfiguration<>("west"));
+
+        startGridsWithCache(grids, clientsCnt, key -> new Account(key, balance), eastCcfg, westCcfg);
+
+        Ignite client = startClientGrid(grids);
+
+        assertEquals("The initial summary value in all caches is not correct.",
+            total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+        forceCheckpoint();
+
+        IgniteInternalFuture<?> txLoadFut = GridTestUtils.runMultiThreadedAsync(
+            () -> {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int amount;
+
+                try {
+                    while (!stop.get()) {
+                        IgniteEx ignite = grid(rnd.nextInt(grids));
+                        IgniteCache<Integer, Account> east = ignite.cache("east");
+                        IgniteCache<Integer, Account> west = ignite.cache("west");
+
+                        amount = rnd.nextInt(transferLimit);
+
+                        txStarted.countDown();
+
+                        try (Transaction tx = ignite.transactions().txStart()) {
+                            Integer id = rnd.nextInt(clientsCnt);
+
+                            Account acc0 = east.get(id);
+                            Account acc1 = west.get(id);
+
+                            acc0.balance -= amount;
+                            acc1.balance += amount;
+
+                            east.put(id, acc0);
+                            west.put(id, acc1);
+
+                            tx.commit();
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    U.error(log, e);
+
+                    fail("Tx must not be failed.");
+                }
+            }, transferThreadCnt, "transfer-account-thread-");
+
+        try {
+            U.await(txStarted);
+
+            grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
+        }
+        finally {
+            stop.set(true);
+        }
+
+        txLoadFut.get();
+
+        assertEquals("The summary value should not changed during tx transfers.",
+            total, sumAllCacheValues(client, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+
+        stopAllGrids();
+
+        IgniteEx snpIg0 = startGridsFromSnapshot(grids, SNAPSHOT_NAME);
+
+        assertEquals("The total amount of all cache values must not changed in snapshot.",
+            total, sumAllCacheValues(snpIg0, clientsCnt, eastCcfg.getName(), westCcfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithCacheNodeFilter() throws Exception {
+        int grids = 4;
+
+        CacheConfiguration<Integer, Integer> ccfg = txCacheConfig(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+            .setNodeFilter(node -> node.consistentId().toString().endsWith("1"));
+
+        IgniteEx ig0 = startGridsWithoutCache(grids);
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            ig0.getOrCreateCache(ccfg).put(i, i);
+
+        ig0.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(grids,
+            cfg -> resolveSnapshotWorkDirectory(cfg.setCacheConfiguration()).getAbsolutePath(),
+            SNAPSHOT_NAME,
+            true);
+
+        awaitPartitionMapExchange();
+        checkCacheDiscoveryDataConsistent();
+
+        CacheGroupDescriptor descr = snp.context().cache().cacheGroupDescriptors()
+            .get(CU.cacheId(ccfg.getName()));
+
+        assertNotNull(descr);
+        assertNotNull(descr.config().getNodeFilter());
+        assertEquals(ccfg.getNodeFilter().apply(grid(1).localNode()),
+            descr.config().getNodeFilter().apply(grid(1).localNode()));
+        assertSnapshotCacheKeys(snp.cache(ccfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testRejectCacheStopDuringClusterSnapshot() throws Exception {
+        // Block the full message, so cluster-wide snapshot operation would not be fully completed.
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+        spi.block((msg) -> {
+            if (msg instanceof FullMessage) {
+                FullMessage<?> msg0 = (FullMessage<?>)msg;
+
+                assertEquals("Snapshot distributed process must be used",
+                    DistributedProcess.DistributedProcessType.START_SNAPSHOT.ordinal(), msg0.type());
+
+                assertTrue("Snapshot has to be finished successfully on all nodes", msg0.error().isEmpty());
+
+                return true;
+            }
+
+            return false;
+        });
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        spi.waitBlocked(10_000L);
+
+        // Creating of new caches should not be blocked.
+        ignite.getOrCreateCache(dfltCacheCfg.setName("default2"))
+            .put(1, 1);
+
+        forceCheckpoint();
+
+        assertThrowsAnyCause(log,
+            () -> {
+                ignite.destroyCache(DEFAULT_CACHE_NAME);
+
+                return 0;
+            },
+            IgniteCheckedException.class,
+            SNP_IN_PROGRESS_ERR_MSG);
+
+        spi.unblock();
+
+        fut.get();
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testBltChangeDuringClusterSnapshot() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        startGrid(3);
+
+        long topVer = ignite.cluster().topologyVersion();
+
+        BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+        spi.block((msg) -> msg instanceof FullMessage);
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        spi.waitBlocked(10_000L);
+
+        // Not baseline node joins successfully.
+        String grid4Dir = folderName(startGrid(4));
+
+        // Not blt node left the cluster and snapshot not affected.
+        stopGrid(4);
+
+        // Client node must connect successfully.
+        startClientGrid(4);
+
+        // Changing baseline complete successfully.
+        ignite.cluster().setBaselineTopology(topVer);
+
+        spi.unblock();
+
+        fut.get();
+
+        assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + grid4Dir,
+            !searchDirectoryRecursively(snp(ignite).snapshotLocalDir(SNAPSHOT_NAME).toPath(), grid4Dir).isPresent());
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotExOnInitiatorLeft() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+        spi.block((msg) -> msg instanceof FullMessage);
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        spi.waitBlocked(10_000L);
+
+        ignite.close();
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            NodeStoppingException.class,
+            SNP_NODE_STOPPING_ERR_MSG);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotExistsException() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        assertThrowsAnyCause(log,
+            () -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
+            IgniteException.class,
+            "Snapshot with given name already exists on local node.");
+
+        stopAllGrids();
+
+        // Check that snapshot has not been accidentally deleted.
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotCleanedOnLeft() throws Exception {
+        CountDownLatch block = new CountDownLatch(1);
+        CountDownLatch partProcessed = new CountDownLatch(1);
+
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
+        String dirNameIgnite0 = folderName(ignite);
+
+        String dirNameIgnite1 = folderName(grid(1));
+
+        snp(grid(1)).localSnapshotSenderFactory(
+            blockingLocalSnapshotSender(grid(1), partProcessed, block));
+
+        TestRecordingCommunicationSpi commSpi1 = TestRecordingCommunicationSpi.spi(grid(1));
+        commSpi1.blockMessages((node, msg) -> msg instanceof SingleNodeMessage);
+
+        IgniteFuture<?> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        U.await(partProcessed);
+
+        stopGrid(1);
+
+        block.countDown();
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            IgniteCheckedException.class,
+            "Snapshot creation has been finished with an error");
+
+        assertTrue("Snapshot directory must be empty for node 0 due to snapshot future fail: " + dirNameIgnite0,
+            !searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite0).isPresent());
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        // Snapshot directory must be cleaned.
+        assertTrue("Snapshot directory must be empty for node 1 due to snapshot future fail: " + dirNameIgnite1,
+            !searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite1).isPresent());
+
+        List<String> allSnapshots = snp(ignite).localSnapshotNames();
+
+        assertTrue("Snapshot directory must be empty due to snapshot fail: " + allSnapshots,
+            allSnapshots.isEmpty());
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testRecoveryClusterSnapshotJvmHalted() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        String grid0Dir = folderName(ignite);
+        String grid1Dir = folderName(grid(1));
+        File locSnpDir = snp(ignite).snapshotLocalDir(SNAPSHOT_NAME);
+
+        jvm = true;
+
+        IgniteConfiguration cfg2 = optimize(getConfiguration(getTestIgniteInstanceName(2)));
+
+        cfg2.getDataStorageConfiguration()
+            .setFileIOFactory(new HaltJvmFileIOFactory(new RandomAccessFileIOFactory(),
+                (Predicate<File> & Serializable) file -> {
+                    // Trying to create FileIO over partition file.
+                    return file.getAbsolutePath().contains(SNAPSHOT_NAME);
+                }));
+
+        startGrid(cfg2);
+
+        String grid2Dir = U.maskForFileName(cfg2.getConsistentId().toString());
+
+        jvm = false;
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+        awaitPartitionMapExchange();
+
+        assertThrowsAnyCause(log,
+            () -> ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(),
+            IgniteCheckedException.class,
+            "Snapshot creation has been finished with an error");
+
+        assertTrue("Snapshot directory must be empty: " + grid0Dir,
+            !searchDirectoryRecursively(locSnpDir.toPath(), grid0Dir).isPresent());
+
+        assertTrue("Snapshot directory must be empty: " + grid1Dir,
+            !searchDirectoryRecursively(locSnpDir.toPath(), grid1Dir).isPresent());
+
+        assertTrue("Snapshot directory must exist due to grid2 has been halted and cleanup not fully performed: " + grid2Dir,
+            searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
+
+        IgniteEx grid2 = startGrid(2);
+
+        assertTrue("Snapshot directory must be empty after recovery: " + grid2Dir,
+            !searchDirectoryRecursively(locSnpDir.toPath(), grid2Dir).isPresent());
+
+        awaitPartitionMapExchange();
+
+        assertTrue("Snapshot directory must be empty", grid2.context().cache().context().snapshotMgr().localSnapshotNames().isEmpty());
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithRebalancing() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        TestRecordingCommunicationSpi commSpi = TestRecordingCommunicationSpi.spi(ignite);
+        commSpi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+
+        startGrid(2);
+
+        ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion());
+
+        commSpi.waitForBlocked();
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        commSpi.stopBlock(true);
+
+        fut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+        awaitPartitionMapExchange();
+        checkPartitionMapExchangeFinished();
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithExplicitPath() throws Exception {
+        File exSnpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
+
+        try {
+            IgniteEx ignite = null;
+
+            for (int i = 0; i < 2; i++) {
+                IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(i)));
+
+                cfg.setSnapshotPath(exSnpDir.getAbsolutePath());
+
+                ignite = startGrid(cfg);
+            }
+
+            ignite.cluster().baselineAutoAdjustEnabled(false);
+            ignite.cluster().state(ACTIVE);
+
+            for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+                ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+            ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+                .get();
+
+            stopAllGrids();
+
+            IgniteEx snp = startGridsFromSnapshot(2, cfg -> exSnpDir.getAbsolutePath(), SNAPSHOT_NAME, true);
+
+            assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+        }
+        finally {
+            stopAllGrids();
+
+            U.delete(exSnpDir);
+        }
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotMetrics() throws Exception {
+        String newSnapshotName = SNAPSHOT_NAME + "_new";
+        CountDownLatch deltaApply = new CountDownLatch(1);
+        CountDownLatch deltaBlock = new CountDownLatch(1);
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        MetricRegistry mreg0 = ignite.context().metric().registry(SNAPSHOT_METRICS);
+
+        LongMetric startTime = mreg0.findMetric("LastSnapshotStartTime");
+        LongMetric endTime = mreg0.findMetric("LastSnapshotEndTime");
+        ObjectGauge<String> snpName = mreg0.findMetric("LastSnapshotName");
+        ObjectGauge<String> errMsg = mreg0.findMetric("LastSnapshotErrorMessage");
+        ObjectGauge<List<String>> snpList = mreg0.findMetric("LocalSnapshotNames");
+
+        // Snapshot process will be blocked when delta partition files processing starts.
+        snp(ignite).localSnapshotSenderFactory(
+            blockingLocalSnapshotSender(ignite, deltaApply, deltaBlock));
+
+        assertEquals("Snapshot start time must be undefined prior to snapshot operation started.",
+            0, startTime.value());
+        assertEquals("Snapshot end time must be undefined to snapshot operation started.",
+            0, endTime.value());
+        assertTrue("Snapshot name must not exist prior to snapshot operation started.", snpName.value().isEmpty());
+        assertTrue("Snapshot error message must null prior to snapshot operation started.", errMsg.value().isEmpty());
+        assertTrue("Snapshots on local node must not exist", snpList.value().isEmpty());
+
+        long cutoffStartTime = U.currentTimeMillis();
+
+        IgniteFuture<Void> fut0 = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        U.await(deltaApply);
+
+        assertTrue("Snapshot start time must be set prior to snapshot operation started " +
+            "[startTime=" + startTime.value() + ", cutoffTime=" + cutoffStartTime + ']',
+            startTime.value() >= cutoffStartTime);
+        assertEquals("Snapshot end time must be zero prior to snapshot operation started.",
+            0, endTime.value());
+        assertEquals("Snapshot name must be set prior to snapshot operation started.",
+            SNAPSHOT_NAME, snpName.value());
+        assertTrue("Snapshot error message must null prior to snapshot operation started.",
+            errMsg.value().isEmpty());
+
+        IgniteFuture<Void> fut1 = grid(1).snapshot().createSnapshot(newSnapshotName);
+
+        assertThrowsWithCause((Callable<Object>)fut1::get, IgniteException.class);
+
+        MetricRegistry mreg1 = grid(1).context().metric().registry(SNAPSHOT_METRICS);
+
+        LongMetric startTime1 = mreg1.findMetric("LastSnapshotStartTime");
+        LongMetric endTime1 = mreg1.findMetric("LastSnapshotEndTime");
+        ObjectGauge<String> snpName1 = mreg1.findMetric("LastSnapshotName");
+        ObjectGauge<String> errMsg1 = mreg1.findMetric("LastSnapshotErrorMessage");
+
+        assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
+            startTime1.value() > 0);
+        assertEquals("Snapshot end time must zero for failed on start snapshots.",
+            0, endTime1.value());
+        assertEquals("Snapshot name must be set when snapshot operation already finished.",
+            newSnapshotName, snpName1.value());
+        assertNotNull("Concurrent snapshot operation must failed.",
+            errMsg1.value());
+
+        deltaBlock.countDown();
+
+        fut0.get();
+
+        assertTrue("Snapshot start time must be greater than zero for finished snapshot.",
+            startTime.value() > 0);
+        assertTrue("Snapshot end time must be greater than zero for finished snapshot.",
+            endTime.value() > 0);
+        assertEquals("Snapshot name must be set when snapshot operation already finished.",
+            SNAPSHOT_NAME, snpName.value());
+        assertTrue("Concurrent snapshot operation must finished successfully.",
+            errMsg.value().isEmpty());
+        assertEquals("Only the first snapshot must be created and stored on disk.",
+            Collections.singletonList(SNAPSHOT_NAME), snpList.value());
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotIncorrectNameFails() throws Exception {
+        IgniteEx ignite = startGridsWithCache(1, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        assertThrowsAnyCause(log,
+            () -> ignite.snapshot().createSnapshot("--№=+.:(snapshot)").get(),
+            IllegalArgumentException.class,
+            "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithOfflineBlt() throws Exception {
+        IgniteEx ignite = startGridsWithCache(3, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        stopGrid(2);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        awaitPartitionMapExchange();
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+        assertPartitionsSame(idleVerify(snp, dfltCacheCfg.getName()));
+    }
+
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithSharedCacheGroup() throws Exception {
+        CacheConfiguration<Integer, Integer> ccfg1 = txCacheConfig(new CacheConfiguration<>("tx1"));
+        CacheConfiguration<Integer, Integer> ccfg2 = txCacheConfig(new CacheConfiguration<>("tx2"));
+
+        ccfg1.setGroupName("group");
+        ccfg2.setGroupName("group");
+
+        IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, Integer::new, ccfg1, ccfg2);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+        awaitPartitionMapExchange();
+
+        assertSnapshotCacheKeys(snp.cache(ccfg1.getName()));
+        assertSnapshotCacheKeys(snp.cache(ccfg2.getName()));
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param started Latch will be released when delta partition processing starts.
+     * @param blocked Latch to await delta partition processing.
+     * @return Factory which produces local snapshot senders.
+     */
+    private Function<String, SnapshotSender> blockingLocalSnapshotSender(IgniteEx ignite,
+        CountDownLatch started,
+        CountDownLatch blocked
+    ) {
+        Function<String, SnapshotSender> old = snp(ignite).localSnapshotSenderFactory();
+
+        return (snpName) -> new DelegateSnapshotSender(log, snp(ignite).snapshotExecutorService(), old.apply(snpName)) {
+            @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+                if (log.isInfoEnabled())
+                    log.info("Processing delta file has been blocked: " + delta.getName());
+
+                started.countDown();
+
+                try {
+                    U.await(blocked);
+
+                    if (log.isInfoEnabled())
+                        log.info("Latch released. Processing delta file continued: " + delta.getName());
+
+                    super.sendDelta0(delta, cacheDirName, pair);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new IgniteException("Interrupted by node stop", e);
+                }
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean isMultiJvm() {
+        return jvm;
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param caches Cache names to read values.
+     * @return Summary value.
+     */
+    private static int sumAllCacheValues(Ignite ignite, int keys, String... caches) {
+        AtomicInteger total = new AtomicInteger();
+
+        for (String name : caches) {
+            IgniteCache<Integer, Account> cache = ignite.cache(name);
+
+            for (int key = 0; key < keys; key++)
+                total.addAndGet(cache.get(key).balance);
+        }
+
+        return total.get();
+    }
+
+    /**
+     * I/O Factory which will halt JVM on conditions occurred.
+     */
+    private static class HaltJvmFileIOFactory implements FileIOFactory {
+        /** Serial version UID. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private final FileIOFactory delegate;
+
+        /** Condition to halt. */
+        private final Predicate<File> pred;
+
+        /**
+         * @param delegate Delegate factory.
+         */
+        public HaltJvmFileIOFactory(FileIOFactory delegate, Predicate<File> pred) {
+            this.delegate = delegate;
+            this.pred = pred;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            FileIO delegate = this.delegate.create(file, modes);
+
+            if (pred.test(file))
+                Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
+
+            return delegate;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
new file mode 100644
index 0000000..0aaa789
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.MBeanException;
+import javax.management.ReflectionException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.mxbean.SnapshotMXBean;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_METRICS;
+
+/**
+ * Tests {@link SnapshotMXBean}.
+ */
+public class IgniteSnapshotMXBeanTest extends AbstractSnapshotSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setMetricExporterSpi(new JmxMetricExporterSpi());
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testCreateSnapshot() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        DynamicMBean snpMBean = metricRegistry(ignite.name(), null, SNAPSHOT_METRICS);
+
+        assertEquals("Snapshot end time must be undefined on first snapshot operation starts.",
+            0, getLastSnapshotEndTime(snpMBean));
+
+        SnapshotMXBean mxBean = getMxBean(ignite.name(), "Snapshot", SnapshotMXBeanImpl.class, SnapshotMXBean.class);
+
+        mxBean.createSnapshot(SNAPSHOT_NAME);
+
+        assertTrue("Waiting for snapshot operation failed.",
+            GridTestUtils.waitForCondition(() -> getLastSnapshotEndTime(snpMBean) > 0, 10_000));
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
+
+        assertSnapshotCacheKeys(snp.cache(dfltCacheCfg.getName()));
+    }
+
+    /**
+
+     * @param mBean Ignite snapshot MBean.
+     * @return Value of snapshot end time.
+     */
+    private static long getLastSnapshotEndTime(DynamicMBean mBean) {
+        try {
+            return (long)mBean.getAttribute("LastSnapshotEndTime");
+        }
+        catch (MBeanException | ReflectionException | AttributeNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
new file mode 100644
index 0000000..b957d48
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.resolveBinaryWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+
+/**
+ * Default snapshot manager test.
+ */
+public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotLocalPartitions() throws Exception {
+        IgniteEx ig = startGridsWithCache(1, 4096, key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        for (int i = 4096; i < 8192; i++) {
+            ig.cache(DEFAULT_CACHE_NAME).put(i, new Account(i, i) {
+                @Override public String toString() {
+                    return "_" + super.toString();
+                }
+            });
+        }
+
+        GridCacheSharedContext<?, ?> cctx = ig.context().cache().context();
+        IgniteSnapshotManager mgr = snp(ig);
+
+        // Collection of pairs group and appropriate cache partition to be snapshot.
+        IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx,
+            SNAPSHOT_NAME,
+            F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
+
+        snpFut.get();
+
+        File cacheWorkDir = ((FilePageStoreManager)ig.context()
+            .cache()
+            .context()
+            .pageStore())
+            .cacheWorkDir(dfltCacheCfg);
+
+        // Checkpoint forces on cluster deactivation (currently only single node in cluster),
+        // so we must have the same data in snapshot partitions and those which left
+        // after node stop.
+        stopGrid(ig.name());
+
+        // Calculate CRCs.
+        IgniteConfiguration cfg = ig.context().config();
+        PdsFolderSettings settings = ig.context().pdsFolderResolver().resolveFolders();
+        String nodePath = databaseRelativePath(settings.folderName());
+        File binWorkDir = resolveBinaryWorkDir(cfg.getWorkDirectory(), settings.folderName());
+        File marshWorkDir = mappingFileStoreWorkDir(U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome()));
+        File snpBinWorkDir = resolveBinaryWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath(), settings.folderName());
+        File snpMarshWorkDir = mappingFileStoreWorkDir(mgr.snapshotLocalDir(SNAPSHOT_NAME).getAbsolutePath());
+
+        final Map<String, Integer> origPartCRCs = calculateCRC32Partitions(cacheWorkDir);
+        final Map<String, Integer> snpPartCRCs = calculateCRC32Partitions(
+            FilePageStoreManager.cacheWorkDir(U.resolveWorkDirectory(mgr.snapshotLocalDir(SNAPSHOT_NAME)
+                    .getAbsolutePath(),
+                nodePath,
+                false),
+                cacheDirName(dfltCacheCfg)));
+
+        assertEquals("Partitions must have the same CRC after file copying and merging partition delta files",
+            origPartCRCs, snpPartCRCs);
+        assertEquals("Binary object mappings must be the same for local node and created snapshot",
+            calculateCRC32Partitions(binWorkDir), calculateCRC32Partitions(snpBinWorkDir));
+        assertEquals("Marshaller meta mast be the same for local node and created snapshot",
+            calculateCRC32Partitions(marshWorkDir), calculateCRC32Partitions(snpMarshWorkDir));
+
+        File snpWorkDir = mgr.snapshotTmpDir();
+
+        assertEquals("Snapshot working directory must be cleaned after usage", 0, snpWorkDir.listFiles().length);
+    }
+
+    /**
+     * Test that all partitions are copied successfully even after multiple checkpoints occur during
+     * the long copy of cache partition files.
+     *
+     * Data consistency checked through a test node started right from snapshot directory and all values
+     * read successes.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testSnapshotLocalPartitionMultiCpWithLoad() throws Exception {
+        int valMultiplier = 2;
+        CountDownLatch slowCopy = new CountDownLatch(1);
+
+        // Start grid node with data before each test.
+        IgniteEx ig = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        GridCacheSharedContext<?, ?> cctx = ig.context().cache().context();
+        AtomicInteger cntr = new AtomicInteger();
+        CountDownLatch ldrLatch = new CountDownLatch(1);
+        IgniteSnapshotManager mgr = snp(ig);
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)cctx.database();
+
+        IgniteInternalFuture<?> loadFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                U.await(ldrLatch);
+
+                while (!Thread.currentThread().isInterrupted())
+                    ig.cache(DEFAULT_CACHE_NAME).put(cntr.incrementAndGet(),
+                        new Account(cntr.incrementAndGet(), cntr.incrementAndGet()));
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                log.warning("Loader has been interrupted", e);
+            }
+        }, 5, "cache-loader-");
+
+        // Register task but not schedule it on the checkpoint.
+        SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
+            cctx.localNodeId(),
+            F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
+                @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                    try {
+                        U.await(slowCopy);
+
+                        delegate.sendPart0(part, cacheDirName, pair, length);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            });
+
+        db.addCheckpointListener(new DbCheckpointListener() {
+            /** {@inheritDoc} */
+            @Override public void beforeCheckpointBegin(Context ctx) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onMarkCheckpointBegin(Context ctx) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onCheckpointBegin(Context ctx) {
+                Map<Integer, Set<Integer>> processed = GridTestUtils.getFieldValue(snpFutTask,
+                    SnapshotFutureTask.class,
+                    "processed");
+
+                if (!processed.isEmpty())
+                    ldrLatch.countDown();
+            }
+        });
+
+        try {
+            snpFutTask.start();
+
+            // Change data before snapshot creation which must be included into it with correct value multiplier.
+            for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+                ig.cache(DEFAULT_CACHE_NAME).put(i, new Account(i, valMultiplier * i));
+
+            // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
+            // due to checkpoint already running and we need to schedule the next one
+            // right after current will be completed.
+            cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, SNAPSHOT_NAME));
+
+            snpFutTask.awaitStarted();
+
+            db.forceCheckpoint("snapshot is ready to be created")
+                .futureFor(CheckpointState.MARKER_STORED_TO_DISK)
+                .get();
+
+            // Change data after snapshot.
+            for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+                ig.cache(DEFAULT_CACHE_NAME).put(i, new Account(i, 3 * i));
+
+            // Snapshot on the next checkpoint must copy page to delta file before write it to a partition.
+            forceCheckpoint(ig);
+
+            slowCopy.countDown();
+
+            snpFutTask.get();
+        }
+        finally {
+            loadFut.cancel();
+        }
+
+        // Now can stop the node and check created snapshots.
+        stopGrid(0);
+
+        cleanPersistenceDir(ig.name());
+
+        // Start Ignite instance from snapshot directory.
+        IgniteEx ig2 = startGridsFromSnapshot(1, SNAPSHOT_NAME);
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++) {
+            assertEquals("snapshot data consistency violation [key=" + i + ']',
+                i * valMultiplier, ((Account)ig2.cache(DEFAULT_CACHE_NAME).get(i)).balance);
+        }
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotLocalPartitionNotEnoughSpace() throws Exception {
+        String err_msg = "Test exception. Not enough space.";
+        AtomicInteger throwCntr = new AtomicInteger();
+        RandomAccessFileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+        IgniteEx ig = startGridWithCache(dfltCacheCfg.setAffinity(new ZeroPartitionAffinityFunction()),
+            CACHE_KEYS_RANGE);
+
+        // Change data after backup.
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            ig.cache(DEFAULT_CACHE_NAME).put(i, 2 * i);
+
+        GridCacheSharedContext<?, ?> cctx0 = ig.context().cache().context();
+
+        IgniteSnapshotManager mgr = snp(ig);
+
+        mgr.ioFactory(new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+                FileIO fileIo = ioFactory.create(file, modes);
+
+                if (file.getName().equals(IgniteSnapshotManager.partDeltaFileName(0)))
+                    return new FileIODecorator(fileIo) {
+                        @Override public int writeFully(ByteBuffer srcBuf) throws IOException {
+                            if (throwCntr.incrementAndGet() == 3)
+                                throw new IOException(err_msg);
+
+                            return super.writeFully(srcBuf);
+                        }
+                    };
+
+                return fileIo;
+            }
+        });
+
+        IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
+            SNAPSHOT_NAME,
+            F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
+
+        // Check the right exception thrown.
+        assertThrowsAnyCause(log,
+            snpFut::get,
+            IOException.class,
+            err_msg);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotCreateLocalCopyPartitionFail() throws Exception {
+        String err_msg = "Test. Fail to copy partition: ";
+        IgniteEx ig = startGridWithCache(dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        Map<Integer, Set<Integer>> parts = new HashMap<>();
+        parts.put(CU.cacheId(DEFAULT_CACHE_NAME), new HashSet<>(Collections.singletonList(0)));
+
+        IgniteSnapshotManager mgr0 = snp(ig);
+
+        IgniteInternalFuture<?> fut = startLocalSnapshotTask(ig.context().cache().context(),
+            SNAPSHOT_NAME,
+            parts,
+            new DelegateSnapshotSender(log, mgr0.snapshotExecutorService(),
+                mgr0.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
+                @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                    if (pair.getPartitionId() == 0)
+                        throw new IgniteException(err_msg + pair);
+
+                    delegate.sendPart0(part, cacheDirName, pair, length);
+                }
+            });
+
+        assertThrowsAnyCause(log,
+            fut::get,
+            IgniteException.class,
+            err_msg);
+    }
+
+    /** @throws Exception If fails. */
+    @Test(expected = IgniteCheckedException.class)
+    public void testLocalSnapshotOnCacheStopped() throws Exception {
+        IgniteEx ig = startGridWithCache(dfltCacheCfg, CACHE_KEYS_RANGE);
+
+        startGrid(1);
+
+        ig.cluster().state(ClusterState.ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        GridCacheSharedContext<?, ?> cctx0 = ig.context().cache().context();
+        IgniteSnapshotManager mgr = snp(ig);
+
+        CountDownLatch cpLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
+            SNAPSHOT_NAME,
+            F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
+                @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                    try {
+                        U.await(cpLatch);
+
+                        delegate.sendPart0(part, cacheDirName, pair, length);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            });
+
+        IgniteCache<?, ?> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        cache.destroy();
+
+        cpLatch.countDown();
+
+        snpFut.get(5_000, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @param src Source node to calculate.
+     * @param grps Groups to collect owning parts.
+     * @param rmtNodeId Remote node id.
+     * @return Map of collected parts.
+     */
+    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, Set<Integer> grps, UUID rmtNodeId) {
+        Map<Integer, Set<Integer>> result = new HashMap<>();
+
+        for (Integer grpId : grps) {
+            Set<Integer> parts = src.context()
+                .cache()
+                .cacheGroup(grpId)
+                .topology()
+                .partitions(rmtNodeId)
+                .entrySet()
+                .stream()
+                .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+
+            result.put(grpId, parts);
+        }
+
+        return result;
+    }
+
+    /**
+     * @param snpName Unique snapshot name.
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param snpSndr Sender which used for snapshot sub-task processing.
+     * @return Future which will be completed when snapshot is done.
+     */
+    private static SnapshotFutureTask startLocalSnapshotTask(
+        GridCacheSharedContext<?, ?> cctx,
+        String snpName,
+        Map<Integer, Set<Integer>> parts,
+        SnapshotSender snpSndr
+    ) throws IgniteCheckedException {
+        SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, snpSndr);
+
+        snpFutTask.start();
+
+        // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
+        // due to checkpoint already running and we need to schedule the next one
+        // right after current will be completed.
+        cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
+
+        snpFutTask.awaitStarted();
+
+        return snpFutTask;
+    }
+
+    /** */
+    private static class ZeroPartitionAffinityFunction extends RendezvousAffinityFunction {
+        @Override public int partition(Object key) {
+            return 0;
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
index 701c68d0..bd7b913 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -46,6 +46,7 @@ import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.MemoryMetrics;
 import org.apache.ignite.PersistenceMetrics;
@@ -616,6 +617,13 @@ public class IgfsIgniteMock implements IgniteEx {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteSnapshot snapshot() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<MemoryMetrics> memoryMetrics() {
         return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 8c59e20..38da757 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -75,6 +75,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
                 null,
                 new WalStateManager(null),
                 new IgniteCacheDatabaseSharedManager(),
+                null,
                 new IgniteCacheSnapshotManager(),
                 new GridCacheDeploymentManager<K, V>(),
                 new GridCachePartitionExchangeManager<K, V>(),
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java
index 1c25310..ce7da5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformDeployServiceTask.java
@@ -18,6 +18,11 @@
 package org.apache.ignite.platform;
 
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
@@ -34,12 +39,6 @@ import org.apache.ignite.services.ServiceContext;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 import static java.util.Calendar.JANUARY;
 
 /**
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index f4cc3ff..d0ee646 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -17,12 +17,6 @@
 
 package org.apache.ignite.testframework;
 
-import javax.cache.CacheException;
-import javax.cache.configuration.Factory;
-import javax.management.Attribute;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -72,6 +66,12 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.management.Attribute;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -497,9 +497,8 @@ public final class GridTestUtils {
      * @param cls Exception class.
      * @param msg Exception message (optional). If provided exception message
      *      and this message should be equal.
-     * @return Thrown throwable.
      */
-    public static Throwable assertThrowsAnyCause(@Nullable IgniteLogger log, Callable<?> call,
+    public static void assertThrowsAnyCause(@Nullable IgniteLogger log, Callable<?> call,
         Class<? extends Throwable> cls, @Nullable String msg) {
         assert call != null;
         assert cls != null;
@@ -515,7 +514,7 @@ public final class GridTestUtils {
                     if (log != null && log.isInfoEnabled())
                         log.info("Caught expected exception: " + t.getMessage());
 
-                    return t;
+                    return;
                 }
 
                 t = t.getCause();
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d025e38..949ec4e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -158,9 +158,10 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
-import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
 import static org.apache.ignite.testframework.config.GridTestProperties.BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER;
 import static org.apache.ignite.testframework.config.GridTestProperties.IGNITE_CFG_PREPROCESSOR_CLS;
 
@@ -643,7 +644,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
      */
     private void resolveWorkDirectory() throws Exception {
         U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", true);
-        U.resolveWorkDirectory(U.defaultWorkDirectory(), "binary_meta", true);
+        U.resolveWorkDirectory(U.defaultWorkDirectory(), BINARY_META_FOLDER, true);
     }
 
     /** */
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 0448e9b..43bc9de 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -49,6 +49,7 @@ import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.MemoryMetrics;
 import org.apache.ignite.PersistenceMetrics;
@@ -494,6 +495,11 @@ public class IgniteMock implements Ignite {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteSnapshot snapshot() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<MemoryMetrics> memoryMetrics() {
         return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 9394760..bd52959 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -138,6 +138,7 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
+import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.BINARY_META_FOLDER;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -843,7 +844,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     /**
      * @param c Cache proxy.
      */
-    protected void printPartitionState(IgniteCache<?, ?> c) {
+    protected static void printPartitionState(IgniteCache<?, ?> c) {
         printPartitionState(c.getConfiguration(CacheConfiguration.class).getName(), 0);
     }
 
@@ -853,7 +854,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      *
      * Print partitionState for cache.
      */
-    protected void printPartitionState(String cacheName, int firstParts) {
+    protected static void printPartitionState(String cacheName, int firstParts) {
         StringBuilder sb = new StringBuilder();
 
         sb.append("----preload sync futures----\n");
@@ -1863,7 +1864,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
         U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "cp", false));
         U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false));
         U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "marshaller", false));
-        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), "binary_meta", false));
+        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), BINARY_META_FOLDER, false));
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index 9932f1f..636f810 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -57,6 +57,7 @@ import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.Ignition;
@@ -826,6 +827,11 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteSnapshot snapshot() {
+        throw new UnsupportedOperationException("Operation isn't supported yet.");
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<MemoryMetrics> memoryMetrics() {
         return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 6d797b4..cf1c881 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -35,6 +35,9 @@ import org.apache.ignite.internal.encryption.MasterKeyChangeConsistencyCheckTest
 import org.apache.ignite.internal.encryption.MasterKeyChangeTest;
 import org.apache.ignite.internal.processors.cache.persistence.CheckpointReadLockFailureTest;
 import org.apache.ignite.internal.processors.cache.persistence.SingleNodePersistenceSslTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
 import org.apache.ignite.marshaller.GridMarshallerMappingConsistencyTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassTest;
 import org.apache.ignite.util.GridCommandHandlerClusterByClassWithSSLTest;
@@ -80,7 +83,11 @@ import org.junit.runners.Suite;
     MasterKeyChangeTest.class,
     MasterKeyChangeConsistencyCheckTest.class,
 
-    EncryptionMXBeanTest.class
+    EncryptionMXBeanTest.class,
+
+    IgniteSnapshotManagerSelfTest.class,
+    IgniteClusterSnapshotSelfTest.class,
+    IgniteSnapshotMXBeanTest.class
 })
 public class IgniteBasicWithPersistenceTestSuite {
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
index 0f54e30..76dd922 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/metric/SqlViewExporterSpiTest.java
@@ -156,11 +156,11 @@ public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
         for (List<?> row : res) {
             names.add((String)row.get(0));
 
-            assertNotNull(row.get(1));
+            assertNotNull("Metric value must be not null [name=" + row.get(0) + ']', row.get(1));
         }
 
         for (String attr : EXPECTED_ATTRIBUTES)
-            assertTrue(attr + " should be exporterd via SQL view", names.contains(attr));
+            assertTrue(attr + " should be exported via SQL view", names.contains(attr));
     }
 
     /** */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
new file mode 100644
index 0000000..a993911
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotWithIndexesTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Test;
+
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
+
+/**
+ * Cluster-wide snapshot test with indexes.
+ */
+public class IgniteClusterSnapshotWithIndexesTest extends AbstractSnapshotSelfTest {
+    /** Configuration with statically configured indexes. */
+    private final CacheConfiguration<Integer, Account> indexedCcfg =
+        txCacheConfig(new CacheConfiguration<Integer, Account>("indexed"))
+            .setQueryEntities(Collections.singletonList(
+                new QueryEntity(Integer.class.getName(), Account.class.getName())
+                    .addQueryField("id", Integer.class.getName(), null)
+                    .addQueryField("balance", Integer.class.getName(), null)
+                    .setIndexes(F.asList(new QueryIndex("id"),
+                        new QueryIndex("balance")))));
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getDataStorageConfiguration().setCheckpointFrequency(DFLT_CHECKPOINT_FREQ);
+
+        return cfg;
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotWithIndexes() throws Exception {
+        String tblName = "Person";
+        IgniteEx ignite = startGridsWithCache(3, CACHE_KEYS_RANGE, key -> new Account(key, key), indexedCcfg);
+
+        executeSql(ignite, "CREATE TABLE " + tblName + " (id int, name varchar, age int, city varchar, " +
+            "primary key (id, name)) WITH \"cache_name=" + tblName + "\"");
+        executeSql(ignite, "CREATE INDEX ON " + tblName + "(city, age)");
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            executeSql(ignite, "INSERT INTO " + tblName + " (id, name, age, city) VALUES(?, 'name', 3, 'city')", i);
+
+        assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(ignite, selectStartSQLStatement(tblName))));
+        assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(ignite.context().cache().jcache(indexedCcfg.getName()),
+            selectStartSQLStatement(Account.class.getSimpleName()))));
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
+            .get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(3, SNAPSHOT_NAME);
+
+        assertTrue(snp.cache(indexedCcfg.getName()).indexReadyFuture().isDone());
+        assertTrue(snp.cache(tblName).indexReadyFuture().isDone());
+
+        List<List<?>> results = executeSql(snp, explainSQLStatement(tblName) + "id > 10");
+
+        // Primary key exists.
+        String explainPlan = (String)results.get(0).get(0);
+        assertTrue(explainPlan.toUpperCase().contains("\"_KEY_PK"));
+        assertFalse(explainPlan.toUpperCase().contains("_SCAN_"));
+
+        results = executeSql(snp, explainSQLStatement(tblName) + "city='city' and age=2");
+        assertUsingSecondaryIndex(results);
+
+        results = executeSql(snp.context().cache().jcache(indexedCcfg.getName()),
+            explainSQLStatement(Account.class.getSimpleName()) + "id=0");
+        assertUsingSecondaryIndex(results);
+
+        assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(snp, selectStartSQLStatement(tblName))));
+        assertEquals(CACHE_KEYS_RANGE, rowsCount(executeSql(snp.context().cache().jcache(indexedCcfg.getName()),
+            selectStartSQLStatement(Account.class.getSimpleName()))));
+
+        forceCheckpoint();
+
+        // Validate indexes on start.
+        ValidateIndexesClosure clo = new ValidateIndexesClosure(new HashSet<>(Arrays.asList(indexedCcfg.getName(), tblName)),
+            0, 0, false, true);
+
+        for (Ignite node : G.allGrids()) {
+            ((IgniteEx)node).context().resource().injectGeneric(clo);
+
+            assertFalse(clo.call().hasIssues());
+        }
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testClusterSnapshotConsistentConfig() throws Exception {
+        String tblName = "PersonCache";
+        int grids = 3;
+
+        IgniteEx ignite = startGridsWithoutCache(grids);
+
+        executeSql(ignite, "CREATE TABLE " + tblName + " (id int, name varchar, age int, city varchar, " +
+            "primary key (id, name)) WITH \"cache_name=" + tblName + "\"");
+        executeSql(ignite, "CREATE INDEX SNP_IDX_0 ON " + tblName + "(age)");
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            executeSql(ignite, "INSERT INTO " + tblName + " (id, name, age, city) VALUES(?, 'name', 3, 'city')", i);
+
+        // Blocking configuration local snapshot sender.
+        List<BlockingExecutor> execs = new ArrayList<>();
+
+        for (Ignite grid : G.allGrids()) {
+            IgniteSnapshotManager mgr = snp((IgniteEx)grid);
+            Function<String, SnapshotSender> old = mgr.localSnapshotSenderFactory();
+
+            BlockingExecutor block = new BlockingExecutor(mgr.snapshotExecutorService());
+            execs.add(block);
+
+            mgr.localSnapshotSenderFactory((snpName) ->
+                new DelegateSnapshotSender(log, block, old.apply(snpName)));
+        }
+
+        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+
+        List<String> idxNames = Arrays.asList("SNP_IDX_1", "SNP_IDX_2");
+
+        executeSql(ignite, "CREATE INDEX " + idxNames.get(0) + " ON " + tblName + "(city)");
+        executeSql(ignite, "CREATE INDEX " + idxNames.get(1) + " ON " + tblName + "(age, city)");
+
+        for (BlockingExecutor exec : execs)
+            exec.unblock();
+
+        fut.get();
+
+        stopAllGrids();
+
+        IgniteEx snp = startGridsFromSnapshot(grids, SNAPSHOT_NAME);
+
+        List<String> currIdxNames = executeSql(snp, "SELECT * FROM SYS.INDEXES").stream().
+            map(l -> (String)l.get(0))
+            .collect(Collectors.toList());
+
+        assertTrue("Concurrently created indexes must not exist in the snapshot: " + currIdxNames,
+            Collections.disjoint(idxNames, currIdxNames));
+
+        List<List<?>> results = executeSql(snp, explainSQLStatement(tblName) + "age=2");
+        assertUsingSecondaryIndex(results);
+    }
+
+    /**
+     * @param name Table name;
+     * @return Select statement.
+     */
+    private static String selectStartSQLStatement(String name) {
+        return "SELECT count(*) FROM " + name;
+    }
+
+    /**
+     * @param name Table name.
+     * @return Explain statement.
+     */
+    private static String explainSQLStatement(String name) {
+        return "explain SELECT * FROM " + name + " WHERE ";
+    }
+
+    /**
+     * @param ignite Ignite instance to execute query on.
+     * @param stmt Statement to run.
+     * @param args Arguments of statement.
+     * @return Run result.
+     */
+    private static List<List<?>> executeSql(IgniteEx ignite, String stmt, Object... args) {
+        return ignite.context().query().querySqlFields(new SqlFieldsQuery(stmt).setArgs(args), true).getAll();
+    }
+
+    /**
+     * @param cache Cache to query.
+     * @param stmt Statement to run.
+     * @return Run result.
+     */
+    private static List<List<?>> executeSql(IgniteCache<?, ?> cache, String stmt) {
+        return cache.query(new SqlFieldsQuery(stmt)).getAll();
+    }
+
+    /**
+     * @param res Statement results.
+     * @return Number of rows.
+     */
+    private static long rowsCount(List<List<?>> res) {
+        return (Long)res.get(0).get(0);
+    }
+
+    /**
+     * @param results Result of execute explain plan query.
+     */
+    private static void assertUsingSecondaryIndex(List<List<?>> results) {
+        String explainPlan = (String)results.get(0).get(0);
+
+        assertTrue(explainPlan, explainPlan.toUpperCase().contains("_IDX"));
+        assertFalse(explainPlan, explainPlan.toUpperCase().contains("_SCAN_"));
+    }
+
+    /** */
+    private static class BlockingExecutor implements Executor {
+        /** Delegate executor. */
+        private final Executor delegate;
+
+        /** Waiting tasks. */
+        private final Queue<Runnable> tasks = new ArrayDeque<>();
+
+        /** {@code true} if tasks must be blocked. */
+        private volatile boolean block = true;
+
+        /**
+         * @param delegate Delegate executor.
+         */
+        public BlockingExecutor(Executor delegate) {
+            this.delegate = delegate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(@NotNull Runnable cmd) {
+            if (block)
+                tasks.offer(cmd);
+            else
+                delegate.execute(cmd);
+        }
+
+        /** Unblock and schedule tasks for execution. */
+        public void unblock() {
+            block = false;
+
+            Runnable r;
+
+            while ((r = tasks.poll()) != null) {
+                delegate.execute(r);
+            }
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 3f69426..2e39c34 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexi
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest;
 import org.apache.ignite.internal.processors.database.IgniteDbMultiNodeWithIndexingPutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingPutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeWithIndexingWalRestoreTest;
@@ -48,7 +49,8 @@ import org.junit.runners.Suite;
     RebuildIndexWithHistoricalRebalanceTest.class,
     IndexingMultithreadedLoadContinuousRestartTest.class,
     LongDestroyDurableBackgroundTaskTest.class,
-    RebuildIndexTest.class
+    RebuildIndexTest.class,
+    IgniteClusterSnapshotWithIndexesTest.class
 })
 public class IgnitePdsWithIndexingTestSuite {
 }
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 0f3645d..38c5a6b 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -317,6 +317,11 @@ public class IgniteSpringBean implements Ignite, DisposableBean, SmartInitializi
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteSnapshot snapshot() {
+        return g.snapshot();
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<MemoryMetrics> memoryMetrics() {
         return DataRegionMetricsAdapter.collectionOf(dataRegionMetrics());
     }