You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/11/22 07:39:57 UTC

[ignite] 02/02: IGNITE-14744 Restore snapshot taken on different topologies (#9539)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 0a7ccfa12dee96f4eee11fa2a78399fdde8890a8
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Sat Nov 20 16:14:25 2021 +0300

    IGNITE-14744 Restore snapshot taken on different topologies (#9539)
---
 .../communication/AbstractTransmission.java        |   4 +-
 .../managers/communication/FileReceiver.java       |   3 +-
 .../managers/communication/GridIoManager.java      |  28 +-
 .../communication/GridIoMessageFactory.java        |   4 +
 .../communication/TransmissionHandler.java         |   7 +-
 .../processors/cache/CacheGroupContext.java        |   2 +-
 .../processors/cache/ClusterCachesInfo.java        |   4 +-
 .../internal/processors/cache/ExchangeActions.java |   4 +-
 .../persistence/file/FilePageStoreManager.java     |  30 +-
 .../snapshot/AbstractSnapshotFutureTask.java       | 136 +++
 .../snapshot/AbstractSnapshotMessage.java          | 108 +++
 .../snapshot/IgniteSnapshotManager.java            | 872 +++++++++++++++++--
 .../snapshot/IgniteSnapshotVerifyException.java    |   3 +
 .../snapshot/SnapshotFilesFailureMessage.java      | 134 +++
 .../snapshot/SnapshotFilesRequestMessage.java      | 174 ++++
 ...eption.java => SnapshotFinishedFutureTask.java} |  38 +-
 .../persistence/snapshot/SnapshotFutureTask.java   | 117 +--
 .../persistence/snapshot/SnapshotMetadata.java     |  57 +-
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |   9 +-
 .../snapshot/SnapshotResponseRemoteFutureTask.java | 172 ++++
 .../snapshot/SnapshotRestoreProcess.java           | 962 +++++++++++++++------
 .../util/distributed/DistributedProcess.java       |   5 +
 .../apache/ignite/internal/util/lang/GridFunc.java |   2 +-
 .../snapshot/AbstractSnapshotSelfTest.java         |  90 +-
 .../snapshot/IgniteClusterSnapshotCheckTest.java   |   1 +
 .../IgniteClusterSnapshotRestoreBaseTest.java      |  36 +
 .../IgniteClusterSnapshotRestoreSelfTest.java      |  77 +-
 .../snapshot/IgniteSnapshotManagerSelfTest.java    |   5 +-
 .../snapshot/IgniteSnapshotRemoteRequestTest.java  | 336 +++++++
 .../IgniteSnapshotRestoreFromRemoteTest.java       | 376 ++++++++
 .../IgniteBasicWithPersistenceTestSuite.java       |   4 +
 ...niteClusterSnapshotRestoreWithIndexingTest.java |  48 +-
 32 files changed, 3303 insertions(+), 545 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
index bd1da54..aed45f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
@@ -57,9 +57,9 @@ abstract class AbstractTransmission implements Closeable {
         int chunkSize
     ) {
         A.notNull(meta, "Initial file meta cannot be null");
-        A.notNullOrEmpty(meta.name(), "Trasmisson name cannot be empty or null");
+        A.notNullOrEmpty(meta.name(), "Transmission name cannot be empty or null");
         A.ensure(meta.offset() >= 0, "File start position cannot be negative");
-        A.ensure(meta.count() > 0, "Total number of bytes to transfer must be greater than zero");
+        A.ensure(meta.count() >= 0, "Total number of bytes to transfer can't be less than zero");
         A.notNull(stopChecker, "Process stop checker cannot be null");
         A.ensure(chunkSize > 0, "Size of chunks to transfer data must be positive");
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
index 6af3ca4..c826e4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
@@ -82,7 +82,8 @@ class FileReceiver extends TransmissionReceiver {
             fileIo.position(meta.offset());
         }
         catch (IOException e) {
-            throw new IgniteException("Unable to open destination file. Receiver will will be stopped", e);
+            throw new IgniteException("Unable to open destination file. Receiver will be stopped: " +
+                file.getAbsolutePath(), e);
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6b4395b..3476192 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -970,7 +970,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                                     if (nodeId.equals(e.getValue().rmtNodeId)) {
                                         it.remove();
 
-                                        interruptRecevier(e.getValue(),
+                                        interruptReceiver(e.getValue(),
                                             new ClusterTopologyCheckedException("Remote node left the grid. " +
                                                 "Receiver has been stopped : " + nodeId));
                                     }
@@ -1179,7 +1179,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
 
             for (ReceiverContext rctx : rcvs) {
-                interruptRecevier(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+                interruptReceiver(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
                     + ctx.localNodeId()));
             }
         }
@@ -1965,7 +1965,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             rcvCtx0 = rcvCtxs.remove(topic);
         }
 
-        interruptRecevier(rcvCtx0,
+        interruptReceiver(rcvCtx0,
             new IgniteCheckedException("Receiver has been closed due to removing corresponding transmission handler " +
                 "on local node [nodeId=" + ctx.localNodeId() + ']'));
     }
@@ -2787,7 +2787,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param rctx Receiver context to use.
      * @param ex Exception to close receiver with.
      */
-    private void interruptRecevier(ReceiverContext rctx, Exception ex) {
+    private void interruptReceiver(ReceiverContext rctx, Exception ex) {
         if (rctx == null)
             return;
 
@@ -2800,9 +2800,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             rctx.lastState = rctx.lastState == null ?
                 new TransmissionMeta(ex) : rctx.lastState.error(ex);
 
-            rctx.hnd.onException(rctx.rmtNodeId, ex);
+            if (X.hasCause(ex, TransmissionCancelledException.class)) {
+                if (log.isInfoEnabled())
+                    log.info("Transmission receiver has been cancelled [rctx=" + rctx + ']');
+            }
+            else
+                U.error(log, "Receiver has been interrupted due to an exception occurred [rctx=" + rctx + ']', ex);
 
-            U.error(log, "Receiver has been interrupted due to an exception occurred [ctx=" + rctx + ']', ex);
+            rctx.hnd.onException(rctx.rmtNodeId, ex);
         }
     }
 
@@ -2856,7 +2861,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     "It's not allowed to process different sessions over the same topic simultaneously. " +
                     "Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ", nodeId=" + rmtNodeId + ']');
 
-                U.error(log, err);
+                U.error(log, "Error has been sent back to remote node. Receiver holds the local topic " +
+                    "[topic=" + topic + ", rmtNodeId=" + rmtNodeId + ", ctx=" + rcvCtx + ']', err);
 
                 out.writeObject(new TransmissionMeta(err));
 
@@ -2881,17 +2887,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
                     receiveFromChannel(topic, rcvCtx, in, out, ch);
                 else
-                    interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
+                    interruptReceiver(rcvCtxs.remove(topic), rcvCtx.lastState.error());
             }
             finally {
                 rcvCtx.lock.unlock();
             }
         }
         catch (Throwable t) {
-            U.error(log, "Download session cannot be finished due to an unexpected error [ctx=" + rcvCtx + ']', t);
-
             // Do not remove receiver context here, since sender will recconect to get this error.
-            interruptRecevier(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
+            interruptReceiver(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
         }
         finally {
             U.closeQuiet(in);
@@ -2991,7 +2995,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 }
 
                 @Override public void onTimeout() {
-                    interruptRecevier(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
+                    interruptReceiver(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
                         "waiting for the reconnect has been timeouted"));
                 }
             });
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d39f7c3..567cf7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -136,6 +136,8 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessage;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessage;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -371,6 +373,8 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
         factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
         factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
         factory.register((short)177, TcpInverseConnectionResponseMessage::new);
+        factory.register(SnapshotFilesRequestMessage.TYPE_CODE, SnapshotFilesRequestMessage::new);
+        factory.register(SnapshotFilesFailureMessage.TYPE_CODE, SnapshotFilesFailureMessage::new);
 
         // [-3..119] [124..129] [-23..-28] [-36..-55] - this
         // [120..123] - DR
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
index da0dd69..a3c9f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
@@ -74,13 +74,16 @@ public interface TransmissionHandler {
      *
      * @param nodeId Remote node id from which request has been received.
      * @param initMeta Initial handler meta info.
-     * @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
+     * @return Instance of read handler to process incoming data like the {@link FileChannel} manner.
      */
     public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
 
     /**
+     * The {@link TransmissionCancelledException} will be received by exception handler if the local transmission
+     * ends by the user interruption request.
+     *
      * @param nodeId Remote node id on which the error occurred.
-     * @param err The err of fail handling process.
+     * @param err The error of fail handling process.
      */
     public void onException(UUID nodeId, Throwable err);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 6b7b06c..7d6eec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -639,7 +639,7 @@ public class CacheGroupContext {
     /**
      * @return Cache shared context.
      */
-    public GridCacheSharedContext shared() {
+    public GridCacheSharedContext<?, ?> shared() {
         return ctx;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 2841e02..b8bd968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -995,6 +995,8 @@ public class ClusterCachesInfo {
         DynamicCacheChangeRequest req,
         String cacheName
     ) {
+        assert exchangeActions != null;
+
         CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
 
         IgniteCheckedException err = null;
@@ -1038,7 +1040,7 @@ public class ClusterCachesInfo {
         if (err == null && req.restartId() == null) {
             IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
 
-            if (snapshotMgr.isRestoring(cacheName, ccfg.getGroupName())) {
+            if (snapshotMgr.isRestoring(ccfg)) {
                 err = new IgniteCheckedException("Cache start failed. A cache or group with the same name is " +
                     "currently being restored from a snapshot [cache=" + cacheName +
                     (ccfg.getGroupName() == null ? "" : ", group=" + ccfg.getGroupName()) + ']');
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 8736a88..3b93244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -446,10 +446,10 @@ public class ExchangeActions {
      *
      */
     public static class CacheGroupActionData {
-        /** */
+        /** Cache group descriptor. */
         private final CacheGroupDescriptor desc;
 
-        /** */
+        /** Destroy flag. */
         private final boolean destroy;
 
         /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 7e2ff26..4601ec0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -134,6 +134,12 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
 
     /** */
+    public static final Predicate<File> DATA_DIR_FILTER = dir ->
+        dir.getName().startsWith(CACHE_DIR_PREFIX) ||
+        dir.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
+        dir.getName().equals(MetaStorage.METASTORAGE_DIR_NAME);
+
+    /** */
     public static final String CACHE_DATA_FILENAME = "cache_data.dat";
 
     /** */
@@ -1015,16 +1021,34 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
         if (files == null)
             return Collections.emptyList();
 
-        return Arrays.stream(dir.listFiles())
+        return Arrays.stream(files)
             .sorted()
             .filter(File::isDirectory)
-            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
-                f.getName().equals(MetaStorage.METASTORAGE_DIR_NAME))
+            .filter(DATA_DIR_FILTER)
             .filter(f -> names.test(cacheGroupName(f)))
             .collect(Collectors.toList());
     }
 
     /**
+     * @param dir Directory to check.
+     * @param grpId Cache group id
+     * @return Files that match cache or cache group pattern.
+     */
+    public static File cacheDirectory(File dir, int grpId) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return null;
+
+        return Arrays.stream(files)
+            .filter(File::isDirectory)
+            .filter(DATA_DIR_FILTER)
+            .filter(f -> CU.cacheId(cacheGroupName(f)) == grpId)
+            .findAny()
+            .orElse(null);
+    }
+
+    /**
      * @param partFileName Partition file name.
      * @return Partition id.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
new file mode 100644
index 0000000..b4c65af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * @param <T> Type of snapshot processing result.
+ */
+abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
+    /** Shared context. */
+    protected final GridCacheSharedContext<?, ?> cctx;
+
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Node id which cause snapshot operation. */
+    protected final UUID srcNodeId;
+
+    /** Unique identifier of snapshot process. */
+    protected final String snpName;
+
+    /** Snapshot working directory on file system. */
+    protected final File tmpSnpWorkDir;
+
+    /** IO factory which will be used for creating snapshot delta-writers. */
+    protected final FileIOFactory ioFactory;
+
+    /** Snapshot data sender. */
+    @GridToStringExclude
+    protected final SnapshotSender snpSndr;
+
+    /** Partition to be processed. */
+    protected final Map<Integer, Set<Integer>> parts;
+
+    /** An exception which has been occurred during snapshot processing. */
+    protected final AtomicReference<Throwable> err = new AtomicReference<>();
+
+    /**
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     * @param parts Partition to be processed.
+     */
+    protected AbstractSnapshotFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts
+    ) {
+        assert snpName != null : "Snapshot name cannot be empty or null.";
+        assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
+        assert snpSndr.executor() != null : "Executor service must be not null.";
+
+        this.cctx = cctx;
+        this.log = cctx.logger(AbstractSnapshotFutureTask.class);
+        this.srcNodeId = srcNodeId;
+        this.snpName = snpName;
+        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+        this.ioFactory = ioFactory;
+        this.snpSndr = snpSndr;
+        this.parts = parts;
+    }
+
+    /**
+     * @return Snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /**
+     * @return Node id which triggers this operation.
+     */
+    public UUID sourceNodeId() {
+        return srcNodeId;
+    }
+
+    /**
+     * Initiates snapshot task.
+     *
+     * @return {@code true} if task started by this call.
+     */
+    public abstract boolean start();
+
+    /**
+     * @param th An exception which occurred during snapshot processing.
+     */
+    public abstract void acceptException(Throwable th);
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel() {
+        // Cancellation of snapshot future should not throw an exception.
+        acceptException(new IgniteFutureCancelledCheckedException("Snapshot operation has been cancelled " +
+            "by external process [snpName=" + snpName + ']'));
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AbstractSnapshotFutureTask.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
new file mode 100644
index 0000000..e76ca16
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+abstract class AbstractSnapshotMessage implements Message {
+    /** Unique request id. */
+    private String reqId;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    protected AbstractSnapshotMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Unique request id.
+     */
+    protected AbstractSnapshotMessage(String reqId) {
+        assert U.alphanumericUnderscore(reqId) : reqId;
+
+        this.reqId = reqId;
+    }
+
+    /**
+     * @return Unique request id.
+     */
+    public String requestId() {
+        return reqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        if (writer.state() == 0) {
+            if (!writer.writeString("reqId", reqId))
+                return false;
+
+            writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (reader.state() == 0) {
+            reqId = reader.readString("reqId");
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        return reader.afterMessageRead(AbstractSnapshotMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AbstractSnapshotMessage.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 7fb70ca..7207d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -36,6 +36,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -50,26 +51,40 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.SnapshotEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFeatures;
@@ -78,6 +93,12 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -117,6 +138,7 @@ import org.apache.ignite.internal.util.GridBusyLock;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
@@ -153,6 +175,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
 import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
 import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
 import static org.apache.ignite.internal.MarshallerContextImpl.resolveMappingFileStoreWorkDir;
 import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
@@ -171,6 +194,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
 import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.DB_DEFAULT_FOLDER;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
@@ -209,6 +233,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Text Reason for checkpoint to start snapshot operation. */
     public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
 
+    /** Name prefix for each remote snapshot operation. */
+    public static final String RMT_SNAPSHOT_PREFIX = "snapshot_";
+
     /** Default snapshot directory for loading remote snapshots. */
     public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
 
@@ -216,8 +243,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
 
     /** Error message to finalize snapshot tasks. */
-    public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
-        "is stopping";
+    public static final String SNP_NODE_STOPPING_ERR_MSG = "The operation is cancelled due to the local node is stopping";
 
     /** Metastorage key to save currently running snapshot. */
     public static final String SNP_RUNNING_KEY = "snapshot-running";
@@ -240,6 +266,27 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Total number of thread to perform local snapshot. */
     private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
 
+    /** Default snapshot topic to receive snapshots from remote node. */
+    private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp");
+
+    /** File transmission parameter of cache group id. */
+    private static final String SNP_GRP_ID_PARAM = "grpId";
+
+    /** File transmission parameter of cache partition id. */
+    private static final String SNP_PART_ID_PARAM = "partId";
+
+    /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
+    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+    /** File transmission parameter of a cache directory with is currently sends its partitions. */
+    private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+    /** Snapshot parameter name for a file transmission. */
+    private static final String RQ_ID_NAME_PARAM = "rqId";
+
+    /** Total snapshot files count which receiver should expect to receive. */
+    private static final String SNP_PARTITIONS_CNT = "partsCnt";
+
     /**
      * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
      * It is important to have only one buffer per thread (instead of creating each buffer per
@@ -249,7 +296,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     private final ThreadLocal<ByteBuffer> locBuff;
 
     /** Map of registered cache snapshot processes and their corresponding contexts. */
-    private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, AbstractSnapshotFutureTask<?>> locSnpTasks = new ConcurrentHashMap<>();
 
     /** Lock to protect the resources is used. */
     private final GridBusyLock busyLock = new GridBusyLock();
@@ -278,6 +325,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Local snapshot sender factory. */
     private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
 
+    /** Remote snapshot sender factory. */
+    private BiFunction<String, UUID, SnapshotSender> rmtSndrFactory = this::remoteSnapshotSenderFactory;
+
     /** Main snapshot directory to save created snapshots. */
     private volatile File locSnpDir;
 
@@ -314,6 +364,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Snapshot operation handlers. */
     private final SnapshotHandlers handlers = new SnapshotHandlers();
 
+    /** Manager to receive responses of remote snapshot requests. */
+    private final SequentialRemoteSnapshotManager snpRmtMgr;
+
     /**
      * @param ctx Kernal context.
      */
@@ -331,6 +384,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
 
         restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
+
+        // Manage remote snapshots.
+        snpRmtMgr = new SequentialRemoteSnapshotManager();
     }
 
     /**
@@ -405,6 +461,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 "the configured via IgniteConfiguration snapshot working path.");
 
         cctx.exchange().registerExchangeAwareComponent(this);
+
         ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
 
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
@@ -427,19 +484,23 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                         endSnpProc.start(snpReq.requestId(), snpReq);
                     }
 
-                    for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+                    for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values()) {
                         if (sctx.sourceNodeId().equals(leftNodeId) ||
                             (reqNodeLeft && snpReq.snapshotName().equals(sctx.snapshotName())))
                             sctx.acceptException(new ClusterTopologyCheckedException(err));
                     }
 
                     restoreCacheGrpProc.onNodeLeft(leftNodeId);
+                    snpRmtMgr.onNodeLeft(leftNodeId);
                 }
             }
             finally {
                 busyLock.leaveBusy();
             }
         }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, snpRmtMgr);
+        cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, snpRmtMgr);
     }
 
     /** {@inheritDoc} */
@@ -450,11 +511,13 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));
 
             // Try stop all snapshot processing if not yet.
-            for (SnapshotFutureTask sctx : locSnpTasks.values())
+            for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values())
                 sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
 
             locSnpTasks.clear();
 
+            snpRmtMgr.stop();
+
             synchronized (snpOpMux) {
                 if (clusterSnpFut != null) {
                     clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
@@ -466,6 +529,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             if (snpRunner != null)
                 snpRunner.shutdownNow();
 
+            cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+            cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
             if (discoLsnr != null)
                 cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
 
@@ -639,7 +705,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             parts.put(grpId, null);
         }
 
-        IgniteInternalFuture<Set<GroupPartitionId>> task0;
+        IgniteInternalFuture<?> task0;
 
         if (parts.isEmpty() && !withMetaStorage)
             task0 = new GridFinishedFuture<>(Collections.emptySet());
@@ -650,7 +716,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 withMetaStorage,
                 locSndrFactory.apply(req.snapshotName()));
 
-            if (withMetaStorage) {
+            if (withMetaStorage && task0 instanceof SnapshotFutureTask) {
                 ((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
                     .suspend(((SnapshotFutureTask)task0).started());
             }
@@ -681,7 +747,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                     cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
                     grpIds,
                     blts,
-                    fut.result());
+                    (Set<GroupPartitionId>)fut.result());
 
                 try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) {
                     U.marshal(marsh, meta, out);
@@ -903,12 +969,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /**
      * Check if the cache or group with the specified name is currently being restored from the snapshot.
      *
-     * @param cacheName Cache name.
-     * @param grpName Cache group name.
+     * @param ccfg Cache configuration.
      * @return {@code True} if the cache or group with the specified name is being restored.
      */
-    public boolean isRestoring(String cacheName, @Nullable String grpName) {
-        return restoreCacheGrpProc.isRestoring(cacheName, grpName);
+    public boolean isRestoring(CacheConfiguration<?, ?> ccfg) {
+        return restoreCacheGrpProc.isRestoring(ccfg);
     }
 
     /**
@@ -980,7 +1045,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         busyLock.enterBusy();
 
         try {
-            for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+            for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values()) {
                 if (sctx.snapshotName().equals(name))
                     sctx.cancel();
             }
@@ -1415,7 +1480,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
         SnapshotOperationRequest snpReq = clusterSnpReq;
 
-        SnapshotFutureTask task = locSnpTasks.get(snpReq.snapshotName());
+        AbstractSnapshotFutureTask<?> task = locSnpTasks.get(snpReq.snapshotName());
 
         if (task == null)
             return;
@@ -1427,7 +1492,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             try {
                 long start = U.currentTimeMillis();
 
-                task.started().get();
+                ((SnapshotFutureTask)task).started().get();
 
                 if (log.isInfoEnabled()) {
                     log.info("Finished waiting for a synchronized checkpoint under topology lock " +
@@ -1441,12 +1506,45 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     }
 
     /**
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param rmtNodeId The remote node to connect to.
+     * @param partHnd Received partition handler.
+     */
+    public IgniteInternalFuture<Void> requestRemoteSnapshotFiles(
+        UUID rmtNodeId,
+        String snpName,
+        Map<Integer, Set<Integer>> parts,
+        BooleanSupplier stopChecker,
+        BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+    ) throws IgniteCheckedException {
+        assert U.alphanumericUnderscore(snpName) : snpName;
+        assert partHnd != null;
+
+        ClusterNode rmtNode = cctx.discovery().node(rmtNodeId);
+
+        if (rmtNode == null) {
+            throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+        }
+
+        if (!nodeSupports(rmtNode, PERSISTENCE_CACHE_SNAPSHOT))
+            throw new IgniteCheckedException("Snapshot on remote node is not supported: " + rmtNode.id());
+
+        RemoteSnapshotFilesRecevier fut = new RemoteSnapshotFilesRecevier(this, rmtNodeId, snpName, parts, stopChecker, partHnd);
+
+        snpRmtMgr.submit(fut);
+
+        return fut;
+    }
+
+    /**
      * @param grps List of cache groups which will be destroyed.
      */
     public void onCacheGroupsStopped(List<Integer> grps) {
-        for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+        for (AbstractSnapshotFutureTask<?> sctx : F.view(locSnpTasks.values(), t -> t instanceof SnapshotFutureTask)) {
             Set<Integer> retain = new HashSet<>(grps);
-            retain.retainAll(sctx.affectedCacheGroups());
+
+            retain.retainAll(((SnapshotFutureTask)sctx).affectedCacheGroups());
 
             if (!retain.isEmpty()) {
                 sctx.acceptException(new IgniteCheckedException("Snapshot has been interrupted due to some of the required " +
@@ -1572,59 +1670,63 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
      * @param snpSndr Factory which produces snapshot receiver instance.
      * @return Snapshot operation task which should be registered on checkpoint to run.
      */
-    SnapshotFutureTask registerSnapshotTask(
+    AbstractSnapshotFutureTask<?> registerSnapshotTask(
         String snpName,
         UUID srcNodeId,
         Map<Integer, Set<Integer>> parts,
         boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
-        if (!busyLock.enterBusy()) {
-            return new SnapshotFutureTask(
-                new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
-        }
-
-        try {
-            if (locSnpTasks.containsKey(snpName))
-                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+        AbstractSnapshotFutureTask<?> task = registerTask(snpName, new SnapshotFutureTask(cctx, srcNodeId, snpName,
+            tmpWorkDir, ioFactory, snpSndr, parts, withMetaStorage, locBuff));
 
-            SnapshotFutureTask snpFutTask;
+        if (!withMetaStorage) {
+            for (Integer grpId : parts.keySet()) {
+                if (!cctx.cache().isEncrypted(grpId))
+                    continue;
 
-            SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
-                snpFutTask = new SnapshotFutureTask(cctx,
-                    srcNodeId,
-                    snpName,
-                    tmpWorkDir,
-                    ioFactory,
-                    snpSndr,
-                    parts,
-                    withMetaStorage,
-                    locBuff));
+                task.onDone(new IgniteCheckedException("Snapshot contains encrypted cache group " + grpId +
+                    " but doesn't include metastore. Metastore is required because it contains encryption keys " +
+                    "required to start with encrypted caches contained in the snapshot."));
 
-            if (prev != null)
-                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+                return task;
+            }
+        }
 
-            if (!withMetaStorage) {
-                for (Integer grpId : parts.keySet()) {
-                    if (!cctx.cache().isEncrypted(grpId))
-                        continue;
+        return task;
+    }
 
-                    snpFutTask.onDone(new IgniteCheckedException("Snapshot contains encrypted cache group " + grpId +
-                        " but doesn't include metastore. Metastore is required because it contains encryption keys " +
-                        "required to start with encrypted caches contained in the snapshot."));
+    /**
+     * @param task Snapshot operation task to be executed.
+     * @return Snapshot operation task which should be registered on checkpoint to run.
+     */
+    private AbstractSnapshotFutureTask<?> registerTask(String rqId, AbstractSnapshotFutureTask<?> task) {
+        if (!busyLock.enterBusy()) {
+            return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+                cctx.localNodeId() + ']'));
+        }
 
-                    return snpFutTask;
-                }
+        try {
+            if (locSnpTasks.containsKey(rqId)) {
+                return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+                    rqId));
             }
 
+            AbstractSnapshotFutureTask<?> prev = locSnpTasks.putIfAbsent(rqId, task);
+
+            if (prev != null)
+                return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+                    rqId));
+
             if (log.isInfoEnabled()) {
                 log.info("Snapshot task has been registered on local node [sctx=" + this +
+                    ", task=" + task.getClass().getSimpleName() +
                     ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
             }
 
-            snpFutTask.listen(f -> locSnpTasks.remove(snpName));
+            task.listen(f -> locSnpTasks.remove(rqId));
 
-            return snpFutTask;
+            return task;
         }
         finally {
             busyLock.leaveBusy();
@@ -1645,6 +1747,26 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         return locSndrFactory;
     }
 
+    /**
+     * @param factory Factory which produces {@link RemoteSnapshotSender} implementation.
+     */
+    void remoteSnapshotSenderFactory(BiFunction<String, UUID, SnapshotSender> factory) {
+        rmtSndrFactory = factory;
+    }
+
+    /**
+     * @param rqId Request id.
+     * @param nodeId Node id.
+     * @return Snapshot sender related to given node id.
+     */
+    RemoteSnapshotSender remoteSnapshotSenderFactory(String rqId, UUID nodeId) {
+        return new RemoteSnapshotSender(log,
+            snpRunner,
+            databaseRelativePath(pdsSettings.folderName()),
+            cctx.gridIO().openTransmissionSender(nodeId, DFLT_INITIAL_SNAPSHOT_TOPIC),
+            rqId);
+    }
+
     /** Snapshot finished successfully or already restored. Key can be removed. */
     private void removeLastMetaStorageKey() throws IgniteCheckedException {
         cctx.database().checkpointReadLock();
@@ -1700,6 +1822,18 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     }
 
     /**
+     * @param nodeId Remote node id on which requests has been registered.
+     * @return Snapshot future related to given node id.
+     */
+    AbstractSnapshotFutureTask<?> lastScheduledSnapshotResponseRemoteTask(UUID nodeId) {
+        return locSnpTasks.values().stream()
+            .filter(t -> t instanceof SnapshotResponseRemoteFutureTask)
+            .filter(t -> t.sourceNodeId().equals(nodeId))
+            .findFirst()
+            .orElse(null);
+    }
+
+    /**
      * @return Relative configured path of persistence data storage directory for the local node.
      * Example: {@code snapshotWorkDir/db/IgniteNodeName0}
      */
@@ -1883,6 +2017,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 return new SnapshotHandlerResult<>(hnd.invoke(ctx), null, ctx.localNode());
             }
             catch (Exception e) {
+                U.error(null, "Error invoking snapshot handler", e);
+
                 return new SnapshotHandlerResult<>(null, e, ctx.localNode());
             }
         }
@@ -2115,6 +2251,642 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            if (stopping) {
+                next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+                if (active != null)
+                    active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+                RemoteSnapshotFilesRecevier r;
+
+                while ((r = queue.poll()) != null)
+                    r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+                return;
+            }
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);
+
+            RemoteSnapshotFilesRecevier active0 = active;
+
+            if (active0 != null)
+                futs.add(active0);
+
+            return futs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotFilesRequestMessage) {
+                    SnapshotFilesRequestMessage reqMsg0 = (SnapshotFilesRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    try {
+                        synchronized (this) {
+                            AbstractSnapshotFutureTask<?> task = lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                            if (task != null) {
+                                // Task will also be removed from local map due to the listener on future done.
+                                task.cancel();
+
+                                log.info("Snapshot request has been cancelled due to another request received " +
+                                    "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                            }
+                        }
+
+                        AbstractSnapshotFutureTask<?> task = registerTask(rqId,
+                            new SnapshotResponseRemoteFutureTask(cctx,
+                                nodeId,
+                                snpName,
+                                tmpWorkDir,
+                                ioFactory,
+                                rmtSndrFactory.apply(rqId, nodeId),
+                                reqMsg0.parts()));
+
+                        task.listen(f -> {
+                            if (f.error() == null)
+                                return;
+
+                            U.error(log, "Failed to process request of creating a snapshot " +
+                                "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                            try {
+                                cctx.gridIO().sendToCustomTopic(nodeId,
+                                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                    new SnapshotFilesFailureMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                    SYSTEM_POOL);
+                            }
+                            catch (IgniteCheckedException ex0) {
+                                U.error(log, "Fail to send the response message with processing snapshot request " +
+                                    "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                            }
+                        });
+
+                        task.start();
+                    }
+                    catch (Throwable t) {
+                        U.error(log, "Error processing snapshot file request message " +
+                            "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', t);
+
+                        cctx.gridIO().sendToCustomTopic(nodeId,
+                            DFLT_INITIAL_SNAPSHOT_TOPIC,
+                            new SnapshotFilesFailureMessage(reqMsg0.requestId(), t.getMessage()),
+                            SYSTEM_POOL);
+                    }
+                }
+                else if (msg instanceof SnapshotFilesFailureMessage) {
+                    SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;
+
+                    RemoteSnapshotFilesRecevier task = active;
+
+                    if (task == null || !task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEnd(UUID nodeId) {
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null)
+                return;
+
+            assert task.partsLeft.get() == 0 : task;
+            assert task.rmtNodeId.equals(nodeId);
+
+            if (log.isInfoEnabled()) {
+                log.info("Requested snapshot from remote node has been fully received " +
+                    "[rqId=" + task.reqId + ", task=" + task + ']');
+            }
+
+            task.onDone((Void)null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onException(UUID nodeId, Throwable ex) {
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null)
+                return;
+
+            assert task.rmtNodeId.equals(nodeId);
+
+            task.acceptException(ex);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+            Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
+            String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
+            String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
+
+            String rqId = (String)fileMeta.params().get(RQ_ID_NAME_PARAM);
+            Integer partsCnt = (Integer)fileMeta.params().get(SNP_PARTITIONS_CNT);
+
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null || task.isDone() || !task.reqId.equals(rqId)) {
+                throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+                    "[rqId=" + rqId + ", meta=" + fileMeta + ", task=" + task + ']');
+            }
+
+            assert task.reqId.equals(rqId) && task.rmtNodeId.equals(nodeId) :
+                "Another transmission in progress [task=" + task + ", nodeId=" + rqId + ']';
+
+            busyLock.enterBusy();
+
+            try {
+                task.partsLeft.compareAndSet(-1, partsCnt);
+
+                File cacheDir = U.resolveWorkDirectory(task.dir.toString(),
+                    Paths.get(rmtDbNodePath, cacheDirName).toString(),
+                    false);
+
+                return Paths.get(cacheDir.getAbsolutePath(), getPartitionFileName(partId)).toString();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+            throw new UnsupportedOperationException("Loading file by chunks is not supported: " + nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+            Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+            Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+            String rqId = (String)initMeta.params().get(RQ_ID_NAME_PARAM);
+
+            assert grpId != null;
+            assert partId != null;
+            assert rqId != null;
+
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null || task.isDone() || !task.reqId.equals(rqId)) {
+                throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+                    "[rqId=" + rqId + ", meta=" + initMeta + ", task=" + task + ']');
+            }
+
+            return new Consumer<File>() {
+                @Override public void accept(File file) {
+                    RemoteSnapshotFilesRecevier task0 = active;
+
+                    if (task0 == null || !task0.equals(task) || task0.isDone()) {
+                        throw new TransmissionCancelledException("Snapshot request is cancelled [rqId=" + rqId +
+                            ", grpId=" + grpId + ", partId=" + partId + ']');
+                    }
+
+                    busyLock.enterBusy();
+
+                    try {
+                        if (stopping)
+                            throw new IgniteException(SNP_NODE_STOPPING_ERR_MSG);
+
+                        task0.acceptFile(file);
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            };
+        }
+    }
+
+    /**
+     * Such an executor can executes tasks not in a single thread, but executes them
+     * on different threads sequentially. It's important for some {@link SnapshotSender}'s
+     * to process sub-task sequentially due to all these sub-tasks may share a single socket
+     * channel to send data to.
+     */
+    private static class SequentialExecutorWrapper implements Executor {
+        /** Ignite logger. */
+        private final IgniteLogger log;
+
+        /** Queue of task to execute. */
+        private final Queue<Runnable> tasks = new ArrayDeque<>();
+
+        /** Delegate executor. */
+        private final Executor executor;
+
+        /** Currently running task. */
+        private volatile Runnable active;
+
+        /** If wrapped executor is shutting down. */
+        private volatile boolean stopping;
+
+        /**
+         * @param executor Executor to run tasks on.
+         */
+        public SequentialExecutorWrapper(IgniteLogger log, Executor executor) {
+            this.log = log.getLogger(SequentialExecutorWrapper.class);
+            this.executor = executor;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void execute(final Runnable r) {
+            assert !stopping : "Task must be cancelled prior to the wrapped executor is shutting down.";
+
+            tasks.offer(() -> {
+                try {
+                    r.run();
+                }
+                finally {
+                    scheduleNext();
+                }
+            });
+
+            if (active == null)
+                scheduleNext();
+        }
+
+        /** */
+        private synchronized void scheduleNext() {
+            if ((active = tasks.poll()) != null) {
+                try {
+                    executor.execute(active);
+                }
+                catch (RejectedExecutionException e) {
+                    tasks.clear();
+
+                    stopping = true;
+
+                    log.warning("Task is outdated. Wrapped executor is shutting down.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoteSnapshotSender extends SnapshotSender {
+        /** The sender which sends files to remote node. */
+        private final GridIoManager.TransmissionSender sndr;
+
+        /** Snapshot name. */
+        private final String rqId;
+
+        /** Local node persistent directory with consistent id. */
+        private final String relativeNodePath;
+
+        /** The number of cache partition files expected to be processed. */
+        private int partsCnt;
+
+        /**
+         * @param log Ignite logger.
+         * @param sndr File sender instance.
+         * @param rqId Snapshot name.
+         */
+        public RemoteSnapshotSender(
+            IgniteLogger log,
+            Executor exec,
+            String relativeNodePath,
+            GridIoManager.TransmissionSender sndr,
+            String rqId
+        ) {
+            super(log, new SequentialExecutorWrapper(log, exec));
+
+            this.sndr = sndr;
+            this.rqId = rqId;
+            this.relativeNodePath = relativeNodePath;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void init(int partsCnt) {
+            this.partsCnt = partsCnt;
+
+            if (F.isEmpty(relativeNodePath))
+                throw new IgniteException("Relative node path cannot be empty.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long len) {
+            try {
+                assert part.exists();
+                assert len > 0 : "Requested partitions has incorrect file length " +
+                    "[pair=" + pair + ", cacheDirName=" + cacheDirName + ']';
+
+                sndr.send(part, 0, len, transmissionParams(rqId, cacheDirName, pair), TransmissionPolicy.FILE);
+
+                if (log.isInfoEnabled()) {
+                    log.info("Partition file has been send [part=" + part.getName() + ", pair=" + pair +
+                        ", length=" + len + ']');
+                }
+            }
+            catch (TransmissionCancelledException e) {
+                if (log.isInfoEnabled()) {
+                    log.info("Transmission partition file has been interrupted [part=" + part.getName() +
+                        ", pair=" + pair + ']');
+                }
+            }
+            catch (IgniteCheckedException | InterruptedException | IOException e) {
+                U.error(log, "Error sending partition file [part=" + part.getName() + ", pair=" + pair +
+                    ", length=" + len + ']', e);
+
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+            throw new UnsupportedOperationException("Sending files by chunks of data is not supported: " + delta.getAbsolutePath());
+        }
+
+        /**
+         * @param cacheDirName Cache directory name.
+         * @param pair Cache group id with corresponding partition id.
+         * @return Map of params.
+         */
+        private Map<String, Serializable> transmissionParams(String rqId, String cacheDirName,
+            GroupPartitionId pair) {
+            Map<String, Serializable> params = new HashMap<>();
+
+            params.put(SNP_GRP_ID_PARAM, pair.getGroupId());
+            params.put(SNP_PART_ID_PARAM, pair.getPartitionId());
+            params.put(SNP_DB_NODE_PATH_PARAM, relativeNodePath);
+            params.put(SNP_CACHE_DIR_NAME_PARAM, cacheDirName);
+            params.put(RQ_ID_NAME_PARAM, rqId);
+            params.put(SNP_PARTITIONS_CNT, partsCnt);
+
+            return params;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close0(@Nullable Throwable th) {
+            U.closeQuiet(sndr);
+
+            if (th == null) {
+                if (log.isInfoEnabled())
+                    log.info("The remote snapshot sender closed normally [snpName=" + rqId + ']');
+            }
+            else {
+                U.warn(log, "The remote snapshot sender closed due to an error occurred while processing " +
+                    "snapshot operation [snpName=" + rqId + ']', th);
+            }
+        }
+    }
+
     /**
      * Snapshot sender which writes all data to local directory.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
index bcaea42..ac42ccd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * Compound snapshot verification exception from the nodes where the verification process executed.
@@ -36,6 +37,8 @@ public class IgniteSnapshotVerifyException extends IgniteException {
      * @param map Map of received exceptions.
      */
     public IgniteSnapshotVerifyException(Map<ClusterNode, ? extends Exception> map) {
+        super(F.first(map.values()));
+
         exs.putAll(map);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
new file mode 100644
index 0000000..3cb9056
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message indicating a failure occurred during processing snapshot files request.
+ */
+public class SnapshotFilesFailureMessage extends AbstractSnapshotMessage {
+    /** Snapshot response message type (value is {@code 179}). */
+    public static final short TYPE_CODE = 179;
+
+    /** Serialization version. */
+    private static final long serialVersionUID = 0L;
+
+    /** Exception message which is occurred during snapshot request processing. */
+    private String errMsg;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public SnapshotFilesFailureMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request id to which response related to.
+     * @param errMsg Response error message.
+     */
+    public SnapshotFilesFailureMessage(String reqId, String errMsg) {
+        super(reqId);
+
+        this.errMsg = errMsg;
+    }
+
+    /**
+     * @return Response error message.
+     */
+    public String errorMessage() {
+        return errMsg;
+    }
+
+    /**
+     * @param errMsg Response error message.
+     * @return {@code this} for chaining.
+     */
+    public SnapshotFilesFailureMessage errorMessage(String errMsg) {
+        this.errMsg = errMsg;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        if (writer.state() == 1) {
+            if (!writer.writeString("errMsg", errMsg))
+                return false;
+
+            writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        if (reader.state() == 1) {
+            errMsg = reader.readString("errMsg");
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        return reader.afterMessageRead(SnapshotFilesFailureMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotFilesFailureMessage.class, this, super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
new file mode 100644
index 0000000..65d1345
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class SnapshotFilesRequestMessage extends AbstractSnapshotMessage {
+    /** Snapshot request message type (value is {@code 178}). */
+    public static final short TYPE_CODE = 178;
+
+    /** Serialization version. */
+    private static final long serialVersionUID = 0L;
+
+    /** Snapshot name to request. */
+    private String snpName;
+
+    /** Map of cache group ids and corresponding set of its partition ids. */
+    @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+    private Map<Integer, int[]> parts;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public SnapshotFilesRequestMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Unique request id.
+     * @param snpName Snapshot name.
+     * @param parts Map of cache group ids and corresponding set of its partition ids to be snapshot.
+     */
+    public SnapshotFilesRequestMessage(String reqId, String snpName, Map<Integer, Set<Integer>> parts) {
+        super(reqId);
+
+        assert parts != null && !parts.isEmpty();
+
+        this.snpName = snpName;
+        this.parts = new HashMap<>();
+
+        for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+            this.parts.put(e.getKey(), U.toIntArray(e.getValue()));
+    }
+
+    /**
+     * @return The demanded cache group partitions per each cache group.
+     */
+    public Map<Integer, Set<Integer>> parts() {
+        Map<Integer, Set<Integer>> res = new HashMap<>();
+
+        for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
+            res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
+                .boxed()
+                .collect(Collectors.toSet()));
+        }
+
+        return res;
+    }
+
+    /**
+     * @return Requested snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        if (writer.state() == 1) {
+            if (!writer.writeString("snpName", snpName))
+                return false;
+
+            writer.incrementState();
+        }
+
+        if (writer.state() == 2) {
+            if (!writer.writeMap("parts", parts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+                return false;
+
+            writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        if (reader.state() == 1) {
+            snpName = reader.readString("snpName");
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        if (reader.state() == 2) {
+            parts = reader.readMap("parts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        return reader.afterMessageRead(SnapshotFilesRequestMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotFilesRequestMessage.class, this, super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
similarity index 53%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
index bcaea42..561bb16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
@@ -17,32 +17,26 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-
-/**
- * Compound snapshot verification exception from the nodes where the verification process executed.
- */
-public class IgniteSnapshotVerifyException extends IgniteException {
-    /** Serial version UID. */
-    private static final long serialVersionUID = 0L;
-
-    /** Map of received exceptions. */
-    private final Map<ClusterNode, Exception> exs = new HashMap<>();
+import org.apache.ignite.IgniteCheckedException;
 
+/** */
+public class SnapshotFinishedFutureTask extends AbstractSnapshotFutureTask<Void> {
     /**
-     * @param map Map of received exceptions.
+     * @param e Finished snapshot task future with particular exception.
      */
-    public IgniteSnapshotVerifyException(Map<ClusterNode, ? extends Exception> map) {
-        exs.putAll(map);
+    public SnapshotFinishedFutureTask(IgniteCheckedException e) {
+        super(null, null, null, null, null, null, null);
+
+        onDone(e);
     }
 
-    /**
-     * @return Map of received exceptions.
-     */
-    public Map<ClusterNode, Exception> exceptions() {
-        return exs;
+    /** {@inheritDoc} */
+    @Override public boolean start() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptException(Throwable th) {
+        onDone(th);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 2fefe35..97f7d72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -38,7 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -47,7 +46,6 @@ import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
@@ -87,33 +85,20 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
 
 /**
- *
+ * The requested map of cache groups and its partitions to include into snapshot represented as <tt>Map<Integer, Set<Integer>></tt>.
+ * If array of partitions is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+ * In this case if all partitions have OWNING state the index partition also will be included.
+ * <p>
+ * If partitions for particular cache group are not provided that they will be collected and added
+ * on checkpoint under the write-lock.
  */
-class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implements CheckpointListener {
-    /** Shared context. */
-    private final GridCacheSharedContext<?, ?> cctx;
-
+class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId>> implements CheckpointListener {
     /** File page store manager for accessing cache group associated files. */
     private final FilePageStoreManager pageStore;
 
-    /** Ignite logger. */
-    private final IgniteLogger log;
-
-    /** Node id which cause snapshot operation. */
-    private final UUID srcNodeId;
-
-    /** Unique identifier of snapshot process. */
-    private final String snpName;
-
-    /** Snapshot working directory on file system. */
-    private final File tmpSnpWorkDir;
-
     /** Local buffer to perform copy-on-write operations for {@link PageStoreSerialWriter}. */
     private final ThreadLocal<ByteBuffer> locBuff;
 
-    /** IO factory which will be used for creating snapshot delta-writers. */
-    private final FileIOFactory ioFactory;
-
     /**
      * The length of file size per each cache partition file.
      * Partition has value greater than zero only for partitions in OWNING state.
@@ -135,20 +120,6 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
      */
     private final List<CacheConfigurationSender> ccfgSndrs = new CopyOnWriteArrayList<>();
 
-    /** Snapshot data sender. */
-    @GridToStringExclude
-    private final SnapshotSender snpSndr;
-
-    /**
-     * Requested map of cache groups and its partitions to include into snapshot. If array of partitions
-     * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
-     * In this case if all of partitions have OWNING state the index partition also will be included.
-     * <p>
-     * If partitions for particular cache group are not provided that they will be collected and added
-     * on checkpoint under the write lock.
-     */
-    private final Map<Integer, Set<Integer>> parts;
-
     /** {@code true} if all metastorage data must be also included into snapshot. */
     private final boolean withMetaStorage;
 
@@ -167,38 +138,16 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
     /** Future which will be completed when task requested to be closed. Will be executed on system pool. */
     private volatile CompletableFuture<Void> closeFut;
 
-    /** An exception which has been occurred during snapshot processing. */
-    private final AtomicReference<Throwable> err = new AtomicReference<>();
-
     /** Flag indicates that task already scheduled on checkpoint. */
     private final AtomicBoolean started = new AtomicBoolean();
 
     /**
-     * @param e Finished snapshot task future with particular exception.
-     */
-    public SnapshotFutureTask(IgniteCheckedException e) {
-        assert e != null : "Exception for a finished snapshot task must be not null";
-
-        cctx = null;
-        pageStore = null;
-        log = null;
-        snpName = null;
-        srcNodeId = null;
-        tmpSnpWorkDir = null;
-        snpSndr = null;
-
-        err.set(e);
-        startedFut.onDone(e);
-        onDone(e);
-        parts = null;
-        withMetaStorage = false;
-        ioFactory = null;
-        locBuff = null;
-    }
-
-    /**
-     * @param snpName Unique identifier of snapshot task.
-     * @param ioFactory Factory to working with delta as file storage.
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
      * @param parts Map of cache groups and its partitions to include into snapshot, if set of partitions
      * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
      */
@@ -213,47 +162,20 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
         boolean withMetaStorage,
         ThreadLocal<ByteBuffer> locBuff
     ) {
+        super(cctx, srcNodeId, snpName, tmpWorkDir, ioFactory, snpSndr, parts);
+
         assert snpName != null : "Snapshot name cannot be empty or null.";
         assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
         assert snpSndr.executor() != null : "Executor service must be not null.";
         assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files.";
         assert !parts.containsKey(MetaStorage.METASTORAGE_CACHE_ID) : "The withMetaStorage must be used instead.";
 
-        this.parts = parts;
         this.withMetaStorage = withMetaStorage;
-        this.cctx = cctx;
         this.pageStore = (FilePageStoreManager)cctx.pageStore();
-        this.log = cctx.logger(SnapshotFutureTask.class);
-        this.snpName = snpName;
-        this.srcNodeId = srcNodeId;
-        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
-        this.snpSndr = snpSndr;
-        this.ioFactory = ioFactory;
         this.locBuff = locBuff;
     }
 
     /**
-     * @return Snapshot name.
-     */
-    public String snapshotName() {
-        return snpName;
-    }
-
-    /**
-     * @return Node id which triggers this operation.
-     */
-    public UUID sourceNodeId() {
-        return srcNodeId;
-    }
-
-    /**
-     * @return Type of snapshot operation.
-     */
-    public Class<? extends SnapshotSender> type() {
-        return snpSndr.getClass();
-    }
-
-    /**
      * @return Set of cache groups included into snapshot operation.
      */
     public Set<Integer> affectedCacheGroups() {
@@ -263,7 +185,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
     /**
      * @param th An exception which occurred during snapshot processing.
      */
-    public void acceptException(Throwable th) {
+    @Override public void acceptException(Throwable th) {
         if (th == null)
             return;
 
@@ -323,7 +245,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
      *
      * @return {@code true} if task started by this call.
      */
-    public boolean start() {
+    @Override public boolean start() {
         if (stopping())
             return false;
 
@@ -676,8 +598,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
 
     /** {@inheritDoc} */
     @Override public boolean cancel() {
-        acceptException(new IgniteFutureCancelledCheckedException("Snapshot operation has been cancelled " +
-            "by external process [snpName=" + snpName + ']'));
+        super.cancel();
 
         try {
             closeAsync().get();
@@ -725,7 +646,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(SnapshotFutureTask.class, this);
+        return S.toString(SnapshotFutureTask.class, this, super.toString());
     }
 
     /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index f5fb6cba..20cb5d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
+import java.io.IOException;
+import java.io.InvalidObjectException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,6 +31,7 @@ import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Snapshot metadata file.
@@ -67,7 +71,7 @@ public class SnapshotMetadata implements Serializable {
      * since for instance, due to the node filter there is no cache data on node.
      */
     @GridToStringInclude
-    private final Map<Integer, Set<Integer>> locParts = new HashMap<>();
+    private transient Map<Integer, Set<Integer>> locParts = new HashMap<>();
 
     /**
      * @param rqId Unique snapshot request id.
@@ -155,7 +159,56 @@ public class SnapshotMetadata implements Serializable {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return Collections.unmodifiableMap(locParts);
+    }
+
+    /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        // Write out any hidden serialization.
+        s.defaultWriteObject();
+
+        // Write out size of map.
+        s.writeInt(locParts.size());
+
+        // Write out all elements in the proper order.
+        for (Map.Entry<Integer, Set<Integer>> e : locParts.entrySet()) {
+            s.writeInt(e.getKey());
+            s.writeInt(e.getValue().size());
+
+            for (Integer partId : e.getValue())
+                s.writeInt(partId);
+        }
+    }
+
+    /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */
+    private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
+        // Read in any hidden serialization.
+        s.defaultReadObject();
+
+        // Read size and verify non-negative.
+        int size = s.readInt();
+
+        if (size < 0)
+            throw new InvalidObjectException("Illegal size: " + size);
+
+        locParts = U.newHashMap(size);
+
+        // Read in all elements in the proper order.
+        for (int i = 0; i < size; i++) {
+            int grpId = s.readInt();
+            int total = s.readInt();
+
+            if (total < 0)
+                throw new InvalidObjectException("Illegal size: " + total);
+
+            Set<Integer> parts = U.newHashSet(total);
+
+            for (int k = 0; k < total; k++)
+                parts.add(s.readInt());
+
+            locParts.put(grpId, parts);
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index a227f66..6b4fba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -103,7 +104,8 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
             if (!grps.remove(grpId))
                 continue;
 
-            Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));
+            Set<Integer> parts = meta.partitions().get(grpId) == null ? Collections.emptySet() :
+                new HashSet<>(meta.partitions().get(grpId));
 
             for (File part : cachePartitionFiles(dir)) {
                 int partId = partId(part.getName());
@@ -213,6 +215,11 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
                 }
             );
         }
+        catch (Throwable t) {
+            log.error("Error executing handler: ", t);
+
+            throw t;
+        }
         finally {
             for (GridComponent comp : snpCtx)
                 comp.stop(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
new file mode 100644
index 0000000..4894032
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectory;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+
+/** */
+public class SnapshotResponseRemoteFutureTask extends AbstractSnapshotFutureTask<Void> {
+    /**
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     */
+    public SnapshotResponseRemoteFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts
+    ) {
+        super(cctx, srcNodeId, snpName, tmpWorkDir, ioFactory, snpSndr, parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean start() {
+        if (F.isEmpty(parts))
+            return false;
+
+        try {
+            List<GroupPartitionId> handled = new ArrayList<>();
+
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+                ofNullable(e.getValue()).orElse(Collections.emptySet())
+                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
+            }
+
+            snpSndr.init(handled.size());
+
+            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName);
+
+            List<CompletableFuture<Void>> futs = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName);
+
+            for (SnapshotMetadata meta : metas) {
+                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+
+                if (F.isEmpty(parts0))
+                    continue;
+
+                handled.removeIf(gp -> {
+                    if (ofNullable(parts0.get(gp.getGroupId()))
+                        .orElse(Collections.emptySet())
+                        .contains(gp.getPartitionId())
+                    ) {
+                        futs.add(CompletableFuture.runAsync(() -> {
+                                if (err.get() != null)
+                                    return;
+
+                                File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
+                                    gp.getGroupId());
+
+                                if (cacheDir == null) {
+                                    throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
+                                        ", pair=" + gp + ']');
+                                }
+
+                                File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
+
+                                if (!snpPart.exists()) {
+                                    throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
+                                        ", pair=" + gp + ']');
+                                }
+
+                                snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
+                            },
+                            snpSndr.executor())
+                            .whenComplete((r, t) -> err.compareAndSet(null, t)));
+
+                        return true;
+                    }
+
+                    return false;
+                });
+            }
+
+            if (!handled.isEmpty()) {
+                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node [snpName=" + snpName +
+                    ", missed=" + handled + ']'));
+            }
+
+            int size = futs.size();
+
+            CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
+                .whenComplete((r, t) -> {
+                    Throwable th = ofNullable(err.get()).orElse(t);
+
+                    if (th == null && log.isInfoEnabled()) {
+                        log.info("Snapshot partitions have been sent to the remote node [snpName=" + snpName +
+                            ", rmtNodeId=" + srcNodeId + ']');
+                    }
+
+                    close(th);
+                });
+
+            return true;
+        }
+        catch (Throwable t) {
+            if (err.compareAndSet(null, t))
+                close(t);
+
+            return false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptException(Throwable th) {
+        if (err.compareAndSet(null, th))
+            close(th);
+    }
+
+    /**
+     * @param th Additional close exception if occurred.
+     */
+    private void close(@Nullable Throwable th) {
+        if (th == null) {
+            snpSndr.close(null);
+            onDone((Void)null);
+        }
+        else {
+            snpSndr.close(th);
+            onDone(th);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 99a730a..c796299 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
@@ -26,30 +29,38 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
+import java.util.function.IntFunction;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
@@ -62,16 +73,23 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Optional.ofNullable;
 import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
@@ -96,7 +114,10 @@ public class SnapshotRestoreProcess {
     private final GridKernalContext ctx;
 
     /** Cache group restore prepare phase. */
-    private final DistributedProcess<SnapshotOperationRequest, ArrayList<StoredCacheData>> prepareRestoreProc;
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotRestoreOperationResponse> prepareRestoreProc;
+
+    /** Cache group restore preload partitions phase. */
+    private final DistributedProcess<UUID, Boolean> preloadProc;
 
     /** Cache group restore cache start phase. */
     private final DistributedProcess<UUID, Boolean> cacheStartProc;
@@ -124,6 +145,9 @@ public class SnapshotRestoreProcess {
         prepareRestoreProc = new DistributedProcess<>(
             ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare);
 
+        preloadProc = new DistributedProcess<>(
+            ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, this::preload, this::finishPreload);
+
         cacheStartProc = new DistributedProcess<>(
             ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart);
 
@@ -248,19 +272,16 @@ public class SnapshotRestoreProcess {
                 cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v));
 
             for (Map.Entry<ClusterNode, List<SnapshotMetadata>> entry : metas.entrySet()) {
-                SnapshotMetadata meta = F.first(entry.getValue());
-
-                assert meta != null : entry.getKey().id();
-
-                if (!entry.getKey().consistentId().toString().equals(meta.consistentId()))
-                    continue;
+                dataNodes.add(entry.getKey().id());
 
-                if (snpBltNodes == null)
-                    snpBltNodes = new HashSet<>(meta.baselineNodes());
+                for (SnapshotMetadata meta : entry.getValue()) {
+                    assert meta != null : entry.getKey().id();
 
-                dataNodes.add(entry.getKey().id());
+                    if (snpBltNodes == null)
+                        snpBltNodes = new HashSet<>(meta.baselineNodes());
 
-                reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+                    reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+                }
             }
 
             if (snpBltNodes == null) {
@@ -277,20 +298,10 @@ public class SnapshotRestoreProcess {
                 return;
             }
 
-            Collection<String> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
-                node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState()));
-
-            snpBltNodes.removeAll(bltNodes);
-
-            if (!snpBltNodes.isEmpty()) {
-                finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
-                    "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']'));
-
-                return;
-            }
+            Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());
 
             SnapshotOperationRequest req =
-                new SnapshotOperationRequest(fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
+                new SnapshotOperationRequest(fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, new HashSet<>(bltNodes));
 
             prepareRestoreProc.start(req.requestId(), req);
         });
@@ -315,28 +326,36 @@ public class SnapshotRestoreProcess {
     }
 
     /**
-     * Check if the cache or group with the specified name is currently being restored from the snapshot.
-     *
-     * @param cacheName Cache name.
-     * @param grpName Cache group name.
+     * @param ccfg Cache configuration.
      * @return {@code True} if the cache or group with the specified name is currently being restored.
      */
-    public boolean isRestoring(String cacheName, @Nullable String grpName) {
-        assert cacheName != null;
+    public boolean isRestoring(CacheConfiguration<?, ?> ccfg) {
+        return isRestoring(ccfg, opCtx);
+    }
 
-        SnapshotRestoreContext opCtx0 = opCtx;
+    /**
+     * Check if the cache or group with the specified name is currently being restored from the snapshot.
+     * @param opCtx Restoring context.
+     * @param ccfg Cache configuration.
+     * @return {@code True} if the cache or group with the specified name is currently being restored.
+     */
+    private boolean isRestoring(CacheConfiguration<?, ?> ccfg, @Nullable SnapshotRestoreContext opCtx) {
+        assert ccfg != null;
 
-        if (opCtx0 == null)
+        if (opCtx == null)
             return false;
 
-        Map<Integer, StoredCacheData> cacheCfgs = opCtx0.cfgs;
+        Map<Integer, StoredCacheData> cacheCfgs = opCtx.cfgs;
+
+        String cacheName = ccfg.getName();
+        String grpName = ccfg.getGroupName();
 
         int cacheId = CU.cacheId(cacheName);
 
         if (cacheCfgs.containsKey(cacheId))
             return true;
 
-        for (File grpDir : opCtx0.dirs) {
+        for (File grpDir : opCtx.dirs) {
             String locGrpName = FilePageStoreManager.cacheGroupName(grpDir);
 
             if (grpName != null) {
@@ -364,7 +383,7 @@ public class SnapshotRestoreProcess {
         if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId))
             return Collections.emptySet();
 
-        return Collections.unmodifiableSet(opCtx0.nodes);
+        return new HashSet<>(opCtx0.nodes());
     }
 
     /**
@@ -412,7 +431,7 @@ public class SnapshotRestoreProcess {
     public void onNodeLeft(UUID leftNodeId) {
         SnapshotRestoreContext opCtx0 = opCtx;
 
-        if (opCtx0 != null && opCtx0.nodes.contains(leftNodeId)) {
+        if (opCtx0 != null && opCtx0.nodes().contains(leftNodeId)) {
             opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG +
                 "Required node has left the cluster [nodeId=" + leftNodeId + ']'));
         }
@@ -498,17 +517,18 @@ public class SnapshotRestoreProcess {
      * @param req Request to prepare cache group restore from the snapshot.
      * @return Result future.
      */
-    private IgniteInternalFuture<ArrayList<StoredCacheData>> prepare(SnapshotOperationRequest req) {
+    private IgniteInternalFuture<SnapshotRestoreOperationResponse> prepare(SnapshotOperationRequest req) {
         if (ctx.clientNode())
             return new GridFinishedFuture<>();
 
         try {
             DiscoveryDataClusterState state = ctx.state().clusterState();
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
 
             if (state.state() != ClusterState.ACTIVE || state.transition())
                 throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active.");
 
-            if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+            if (snpMgr.isSnapshotCreating())
                 throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
 
             if (ctx.encryption().isMasterKeyChangeInProgress()) {
@@ -530,6 +550,13 @@ public class SnapshotRestoreProcess {
                 }
             }
 
+            if (log.isInfoEnabled()) {
+                log.info("Starting local snapshot prepare restore operation" +
+                    " [reqId=" + req.requestId() +
+                    ", snapshot=" + req.snapshotName() +
+                    ", caches=" + req.groups() + ']');
+            }
+
             SnapshotRestoreContext opCtx0 = prepareContext(req);
 
             synchronized (this) {
@@ -537,13 +564,10 @@ public class SnapshotRestoreProcess {
 
                 ClusterSnapshotFuture fut0 = fut;
 
-                if (fut0 != null && fut0.interruptEx != null)
-                    opCtx0.err.compareAndSet(null, fut0.interruptEx);
+                if (fut0 != null)
+                    opCtx0.errHnd.accept(fut0.interruptEx);
             }
 
-            if (opCtx0.dirs.isEmpty())
-                return new GridFinishedFuture<>();
-
             // Ensure that shared cache groups has no conflicts.
             for (StoredCacheData cfg : opCtx0.cfgs.values()) {
                 ensureCacheAbsent(cfg.config().getName());
@@ -552,46 +576,11 @@ public class SnapshotRestoreProcess {
                     ensureCacheAbsent(cfg.config().getGroupName());
             }
 
-            if (log.isInfoEnabled()) {
-                log.info("Starting local snapshot restore operation" +
-                    " [reqId=" + req.requestId() +
-                    ", snapshot=" + req.snapshotName() +
-                    ", cache(s)=" + F.viewReadOnly(opCtx0.cfgs.values(), data -> data.config().getName()) + ']');
-            }
-
-            Consumer<Throwable> errHnd = (ex) -> opCtx.err.compareAndSet(null, ex);
-            BooleanSupplier stopChecker = () -> opCtx.err.get() != null;
-            GridFutureAdapter<ArrayList<StoredCacheData>> retFut = new GridFutureAdapter<>();
-
             if (ctx.isStopping())
-                throw new NodeStoppingException("Node is stopping.");
-
-            opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+                throw new NodeStoppingException("The node is stopping: " + ctx.localNodeId());
 
-            restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd)
-                .thenAccept(res -> {
-                    try {
-                        Throwable err = opCtx.err.get();
-
-                        if (err != null)
-                            throw err;
-
-                        for (File src : opCtx0.dirs)
-                            Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
-                    }
-                    catch (Throwable t) {
-                        log.error("Unable to restore cache group(s) from the snapshot " +
-                            "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t);
-
-                        retFut.onDone(t);
-
-                        return;
-                    }
-
-                    retFut.onDone(new ArrayList<>(opCtx.cfgs.values()));
-                });
-
-            return retFut;
+            return new GridFinishedFuture<>(new SnapshotRestoreOperationResponse(opCtx0.cfgs.values(),
+                opCtx0.metasPerNode.get(ctx.localNodeId())));
         }
         catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) {
             log.error("Unable to restore cache group(s) from the snapshot " +
@@ -605,85 +594,11 @@ public class SnapshotRestoreProcess {
      * @param cacheDir Cache directory.
      * @return Temporary directory.
      */
-    private File formatTmpDirName(File cacheDir) {
+    private static File formatTmpDirName(File cacheDir) {
         return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
     }
 
     /**
-     * Copy partition files and update binary metadata.
-     *
-     * @param snpName Snapshot name.
-     * @param dirs Cache directories to restore from the snapshot.
-     * @param updateMeta Update binary metadata flag.
-     * @param stopChecker Process interrupt checker.
-     * @param errHnd Error handler.
-     * @throws IgniteCheckedException If failed.
-     */
-    private CompletableFuture<Void> restoreAsync(
-        String snpName,
-        Collection<File> dirs,
-        boolean updateMeta,
-        BooleanSupplier stopChecker,
-        Consumer<Throwable> errHnd
-    ) throws IgniteCheckedException {
-        IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
-        String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
-
-        List<CompletableFuture<Void>> futs = new ArrayList<>();
-
-        if (updateMeta) {
-            File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName);
-
-            futs.add(CompletableFuture.runAsync(() -> {
-                try {
-                    ctx.cacheObjects().updateMetadata(binDir, stopChecker);
-                }
-                catch (Throwable t) {
-                    errHnd.accept(t);
-                }
-            }, snapshotMgr.snapshotExecutorService()));
-        }
-
-        for (File cacheDir : dirs) {
-            File tmpCacheDir = formatTmpDirName(cacheDir);
-            File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName),
-                Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
-
-            assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir;
-
-            for (File snpFile : snpCacheDir.listFiles()) {
-                futs.add(CompletableFuture.runAsync(() -> {
-                    if (stopChecker.getAsBoolean())
-                        return;
-
-                    try {
-                        if (Thread.interrupted())
-                            throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
-
-                        File target = new File(tmpCacheDir, snpFile.getName());
-
-                        if (log.isDebugEnabled()) {
-                            log.debug("Copying file from the snapshot " +
-                                "[snapshot=" + snpName +
-                                ", src=" + snpFile +
-                                ", target=" + target + "]");
-                        }
-
-                        IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length());
-                    }
-                    catch (Throwable t) {
-                        errHnd.accept(t);
-                    }
-                }, ctx.cache().context().snapshotMgr().snapshotExecutorService()));
-            }
-        }
-
-        int futsSize = futs.size();
-
-        return CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]));
-    }
-
-    /**
      * @param req Request to prepare cache group restore from the snapshot.
      * @return Snapshot restore operation context.
      * @throws IgniteCheckedException If failed.
@@ -693,76 +608,76 @@ public class SnapshotRestoreProcess {
             throw new IgniteCheckedException(OP_REJECT_MSG +
                 "The previous snapshot restore operation was not completed.");
         }
-
         GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
 
-        SnapshotMetadata meta = F.first(cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName()));
+        List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName());
+
+        // Collection of baseline nodes that must survive and additional discovery data required for the affinity calculation.
+        DiscoCache discoCache = ctx.discovery().discoCache();
+
+        if (!F.transform(discoCache.aliveBaselineNodes(), F.node2id()).containsAll(req.nodes()))
+            throw new IgniteCheckedException("Restore context cannot be inited since the required baseline nodes missed: " + discoCache);
+
+        DiscoCache discoCache0 = discoCache.copy(discoCache.version(), null);
 
-        if (meta == null || !meta.consistentId().equals(cctx.localNode().consistentId().toString()))
-            return new SnapshotRestoreContext(req, Collections.emptyList(), Collections.emptyMap());
+        if (F.first(metas) == null)
+            return new SnapshotRestoreContext(req, discoCache0, Collections.emptyMap(), cctx.localNodeId(), Collections.emptyList());
 
-        if (meta.pageSize() != cctx.database().pageSize()) {
+        if (F.first(metas).pageSize() != cctx.database().pageSize()) {
             throw new IgniteCheckedException("Incompatible memory page size " +
-                "[snapshotPageSize=" + meta.pageSize() +
+                "[snapshotPageSize=" + F.first(metas).pageSize() +
                 ", local=" + cctx.database().pageSize() +
                 ", snapshot=" + req.snapshotName() +
                 ", nodeId=" + cctx.localNodeId() + ']');
         }
 
-        List<File> cacheDirs = new ArrayList<>();
         Map<String, StoredCacheData> cfgsByName = new HashMap<>();
         FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
 
         // Collect the cache configurations and prepare a temporary directory for copying files.
         // Metastorage can be restored only manually by directly copying files.
-        for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
-            name -> !METASTORAGE_CACHE_NAME.equals(name)))
-        {
-            String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
+        for (SnapshotMetadata meta : metas) {
+            for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
+                name -> !METASTORAGE_CACHE_NAME.equals(name))) {
+                String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
 
-            if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
-                continue;
+                if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
+                    continue;
 
-            File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
+                File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
 
-            if (cacheDir.exists()) {
-                if (!cacheDir.isDirectory()) {
-                    throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
-                        "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
-                }
+                if (cacheDir.exists()) {
+                    if (!cacheDir.isDirectory()) {
+                        throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
+                            "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
+                    }
 
-                if (cacheDir.list().length > 0) {
-                    throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
-                        "[group=" + grpName + ", dir=" + cacheDir + ']');
-                }
+                    if (cacheDir.list().length > 0) {
+                        throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
+                            "[group=" + grpName + ", dir=" + cacheDir + ']');
+                    }
 
-                if (!cacheDir.delete()) {
-                    throw new IgniteCheckedException("Unable to remove empty cache directory " +
-                        "[group=" + grpName + ", dir=" + cacheDir + ']');
+                    if (!cacheDir.delete()) {
+                        throw new IgniteCheckedException("Unable to remove empty cache directory " +
+                            "[group=" + grpName + ", dir=" + cacheDir + ']');
+                    }
                 }
-            }
 
-            File tmpCacheDir = formatTmpDirName(cacheDir);
+                File tmpCacheDir = formatTmpDirName(cacheDir);
 
-            if (tmpCacheDir.exists()) {
-                throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
-                    "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
-            }
+                if (tmpCacheDir.exists()) {
+                    throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
+                        "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+                }
 
-            if (!tmpCacheDir.mkdir()) {
-                throw new IgniteCheckedException("Unable to restore cache group, cannot create temp directory " +
-                    "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+                pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
             }
-
-            cacheDirs.add(cacheDir);
-
-            pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
         }
 
         Map<Integer, StoredCacheData> cfgsById =
             cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v));
 
-        return new SnapshotRestoreContext(req, cacheDirs, cfgsById);
+        return new SnapshotRestoreContext(req, discoCache0, cfgsById, cctx.localNodeId(), metas);
     }
 
     /**
@@ -770,7 +685,7 @@ public class SnapshotRestoreProcess {
      * @param res Results.
      * @param errs Errors.
      */
-    private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res, Map<UUID, Exception> errs) {
+    private void finishPrepare(UUID reqId, Map<UUID, SnapshotRestoreOperationResponse> res, Map<UUID, Exception> errs) {
         if (ctx.clientNode())
             return;
 
@@ -787,31 +702,358 @@ public class SnapshotRestoreProcess {
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+                    opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + ']');
+            }
+
+            CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" + opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>())
+                                .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            List<PartitionRestoreFuture> rmtAwaitParts = rmtLoadParts.values().stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+            // This is necessary for sending only one partitions request per each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = snapshotAffinity(
+                opCtx0.metasPerNode.entrySet()
+                    .stream()
+                    .filter(e -> !e.getKey().equals(ctx.localNodeId()))
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).remove(new PartitionRestoreFuture(partId)));
+
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + ']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : snpAff.entrySet()) {
+                    ctx.cache().context().snapshotMgr()
+                        .requestRemoteSnapshotFiles(m.getKey(),
+                            opCtx0.snpName,
+                            m.getValue(),
+                            opCtx0.stopChecker,
+                            (snpFile, t) -> {
+                                if (opCtx0.stopChecker.getAsBoolean())
+                                    throw new IgniteInterruptedException("Snapshot remote operation request cancelled.");
+
+                                if (t == null) {
+                                    int grpId = CU.cacheId(cacheGroupName(snpFile.getParentFile()));
+                                    int partId = partId(snpFile.getName());
+
+                                    PartitionRestoreFuture partFut = F.find(opCtx0.locProgress.get(grpId),
+                                        null,
+                                        new IgnitePredicate<PartitionRestoreFuture>() {
+                                            @Override public boolean apply(PartitionRestoreFuture f) {
+                                                return f.partId == partId;
+                                            }
+                                        });
+
+                                    assert partFut != null : snpFile.getAbsolutePath();
+
+                                    File tmpCacheDir = formatTmpDirName(grpToDir.get(grpId));
+
+                                    Path partFile = Paths.get(tmpCacheDir.getAbsolutePath(), snpFile.getName());
+
+                                    try {
+                                        IgniteSnapshotManager.copy(snpMgr.ioFactory(),
+                                            snpFile,
+                                            partFile.toFile(),
+                                            snpFile.length());
+
+                                        partFut.complete(partFile);
+                                    }
+                                    catch (Exception e) {
+                                        opCtx0.errHnd.accept(e);
+                                        completeListExceptionally(rmtAwaitParts, e);
+                                    }
+                                }
+                                else {
+                                    opCtx0.errHnd.accept(t);
+                                    completeListExceptionally(rmtAwaitParts, t);
+                                }
+                            });
+                }
+            }
+            catch (IgniteCheckedException e) {
+                opCtx0.errHnd.accept(e);
+                completeListExceptionally(rmtAwaitParts, e);
+            }
+
+            List<PartitionRestoreFuture> allParts = opCtx0.locProgress.values().stream().flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+            int size = allParts.size();
+
+            CompletableFuture.allOf(allParts.toArray(new CompletableFuture[size]))
+                .runAfterBothAsync(metaFut, () -> {
+                    try {
+                        if (opCtx0.stopChecker.getAsBoolean())
+                            throw new IgniteInterruptedException("The operation has been stopped on temporary directory switch.");
+
+                        for (File src : opCtx0.dirs)
+                            Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
+                    }
+                    catch (IOException e) {
+                        throw new IgniteException(e);
+                    }
+                }, snpMgr.snapshotExecutorService())
+                .whenComplete((r, t) -> opCtx0.errHnd.accept(t))
+                .whenComplete((res, t) -> {
+                    Throwable t0 = ofNullable(opCtx0.err.get()).orElse(t);
+
+                    if (t0 == null)
+                        retFut.onDone(true);
+                    else {
+                        log.error("Unable to restore cache group(s) from a snapshot " +
+                            "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t0);
+
+                        retFut.onDone(t0);
+                    }
+                });
+        }
+        catch (Exception ex) {
+            opCtx0.errHnd.accept(ex);
+
+            return new GridFinishedFuture<>(ex);
+        }
+
+        return retFut;
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
+
+        opCtx0.errHnd.accept(failure);
+
+        if (failure != null) {
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+            if (U.isLocalNodeCoordinator(ctx.discovery()))
+                rollbackRestoreProc.start(reqId, reqId);
+
+            return;
+        }
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
             cacheStartProc.start(reqId, reqId);
     }
 
@@ -843,7 +1085,8 @@ public class SnapshotRestoreProcess {
 
         // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
         // the cluster during the cache startup, the whole procedure will be rolled back.
-        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
+        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false,
+            IgniteUuid.fromUuid(reqId));
     }
 
     /**
@@ -858,7 +1101,7 @@ public class SnapshotRestoreProcess {
         SnapshotRestoreContext opCtx0 = opCtx;
 
         Exception failure = errs.values().stream().findFirst().
-            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+            orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
 
         if (failure == null) {
             finishProcess(reqId);
@@ -873,11 +1116,48 @@ public class SnapshotRestoreProcess {
     }
 
     /**
+     * @param metas Map of snapshot metadata distribution across the cluster.
+     * @return Map of cache partitions per each node.
+     */
+    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+        Map<UUID, List<SnapshotMetadata>> metas,
+        BiPredicate<Integer, Integer> filter
+    ) {
+        Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+        List<UUID> nodes = new ArrayList<>(metas.keySet());
+        Collections.shuffle(nodes);
+
+        Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
+        nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
+
+        for (Map.Entry<UUID, List<SnapshotMetadata>> e : shuffleMetas.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(Collections.emptyList())) {
+                Map<Integer, Set<Integer>> parts = ofNullable(meta.partitions()).orElse(Collections.emptyMap());
+
+                for (Map.Entry<Integer, Set<Integer>> metaParts : parts.entrySet()) {
+                    for (Integer partId : metaParts.getValue()) {
+                        if (filter.test(metaParts.getKey(), partId)) {
+                            nodeToSnp.computeIfAbsent(nodeId, n -> new HashMap<>())
+                                .computeIfAbsent(metaParts.getKey(), k -> new HashSet<>())
+                                .add(partId);
+                        }
+                    }
+                }
+            }
+        }
+
+        return nodeToSnp;
+    }
+
+    /**
      * @param reqNodes Set of required topology nodes.
      * @param respNodes Set of responding topology nodes.
      * @return Error, if no response was received from the required topology node.
      */
-    private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
+    private Exception checkNodeLeft(Collection<UUID> reqNodes, Set<UUID> respNodes) {
         if (!respNodes.containsAll(reqNodes)) {
             Set<UUID> leftNodes = new HashSet<>(reqNodes);
 
@@ -907,46 +1187,46 @@ public class SnapshotRestoreProcess {
 
         synchronized (this) {
             opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+        }
 
-            try {
-                ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
-                    if (log.isInfoEnabled()) {
-                        log.info("Removing restored cache directories [reqId=" + reqId +
-                            ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
-                    }
+        try {
+            ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
+                if (log.isInfoEnabled()) {
+                    log.info("Removing restored cache directories [reqId=" + reqId +
+                        ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
+                }
 
-                    IgniteCheckedException ex = null;
+                IgniteCheckedException ex = null;
 
-                    for (File cacheDir : opCtx0.dirs) {
-                        File tmpCacheDir = formatTmpDirName(cacheDir);
+                for (File cacheDir : opCtx0.dirs) {
+                    File tmpCacheDir = formatTmpDirName(cacheDir);
 
-                        if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+                    if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
+                        log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
+                            "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
 
-                            ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
-                        }
+                        ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
+                    }
 
-                        if (cacheDir.exists() && !U.delete(cacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
+                    if (cacheDir.exists() && !U.delete(cacheDir)) {
+                        log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
+                            "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
 
-                            ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
-                        }
+                        ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
                     }
+                }
 
-                    if (ex != null)
-                        retFut.onDone(ex);
-                    else
-                        retFut.onDone(true);
-                });
-            }
-            catch (RejectedExecutionException e) {
-                log.error("Unable to perform rollback routine, task has been rejected " +
-                    "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
+                if (ex != null)
+                    retFut.onDone(ex);
+                else
+                    retFut.onDone(true);
+            });
+        }
+        catch (RejectedExecutionException e) {
+            log.error("Unable to perform rollback routine, task has been rejected " +
+                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
 
-                retFut.onDone(e);
-            }
+            retFut.onDone(e);
         }
 
         return retFut;
@@ -968,8 +1248,8 @@ public class SnapshotRestoreProcess {
 
         SnapshotRestoreContext opCtx0 = opCtx;
 
-        if (!res.keySet().containsAll(opCtx0.nodes)) {
-            Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes);
+        if (!res.keySet().containsAll(opCtx0.nodes())) {
+            Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes());
 
             leftNodes.removeAll(res.keySet());
 
@@ -981,6 +1261,83 @@ public class SnapshotRestoreProcess {
     }
 
     /**
+     * @param mgr Ignite snapshot manager.
+     * @param opCtx Snapshot operation context.
+     * @param srcDir Snapshot directory to copy from.
+     * @param targetDir Destination directory to copy to.
+     */
+    private static void copyLocalAsync(
+        IgniteSnapshotManager mgr,
+        SnapshotRestoreContext opCtx,
+        File srcDir,
+        File targetDir,
+        PartitionRestoreFuture partFut
+    ) {
+        File snpFile = new File(srcDir, FilePageStoreManager.getPartitionFileName(partFut.partId));
+        Path partFile = Paths.get(targetDir.getAbsolutePath(), FilePageStoreManager.getPartitionFileName(partFut.partId));
+
+        CompletableFuture.supplyAsync(() -> {
+                if (opCtx.stopChecker.getAsBoolean())
+                    throw new IgniteInterruptedException("The operation has been stopped on copy file: " + snpFile.getAbsolutePath());
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedException("Thread has been interrupted: " + Thread.currentThread().getName());
+
+                if (!snpFile.exists()) {
+                    throw new IgniteException("Partition snapshot file doesn't exist [snpName=" + opCtx.snpName +
+                        ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + snpFile.getName() + ']');
+                }
+
+                IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, partFile.toFile(), snpFile.length());
+
+                return partFile;
+            }, mgr.snapshotExecutorService())
+            .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+            .whenComplete((r, t) -> {
+                if (t == null)
+                    partFut.complete(partFile);
+                else
+                    partFut.completeExceptionally(t);
+            });
+    }
+
+    /**
+     * @param affCache Affinity cache.
+     * @param node Cluster node to get assigned partitions.
+     * @return The set of partitions assigned to the given node.
+     */
+    private static <T> Set<T> nodeAffinityPartitions(
+        GridAffinityAssignmentCache affCache,
+        ClusterNode node,
+        IntFunction<T> factory
+    ) {
+        return IntStream.range(0, affCache.partitions())
+            .filter(p -> affCache.idealAssignment().assignment().get(p).contains(node))
+            .mapToObj(factory)
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * @param col Collection of sets to complete.
+     * @param ex Exception to set.
+     */
+    private static void completeListExceptionally(List<PartitionRestoreFuture> col, Throwable ex) {
+        for (PartitionRestoreFuture f : col)
+            f.completeExceptionally(ex);
+    }
+
+    /**
+     * @param map Map of partitions and cache groups.
+     * @return String representation.
+     */
+    private static String partitionsMapToCompactString(Map<Integer, Set<Integer>> map) {
+        return map.entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> S.compact(e.getValue())))
+            .toString();
+    }
+
+    /**
      * Cache group restore from snapshot operation context.
      */
     private static class SnapshotRestoreContext {
@@ -990,15 +1347,43 @@ public class SnapshotRestoreProcess {
         /** Snapshot name. */
         private final String snpName;
 
-        /** Baseline node IDs that must be alive to complete the operation. */
-        private final Set<UUID> nodes;
+        /** Baseline discovery cache for node IDs that must be alive to complete the operation.*/
+        private final DiscoCache discoCache;
 
-        /** List of restored cache group directories. */
-        private final Collection<File> dirs;
+        /** Operational node id. */
+        private final UUID opNodeId;
+
+        /**
+         * Set of restored cache groups path on local node. Collected when all cache configurations received
+         * from the <tt>prepare</tt> distributed process.
+         */
+        private final Set<File> dirs = new HashSet<>();
 
         /** The exception that led to the interruption of the process. */
         private final AtomicReference<Throwable> err = new AtomicReference<>();
 
+        /** Distribution of snapshot metadata files across the cluster. */
+        private final Map<UUID, List<SnapshotMetadata>> metasPerNode = new HashMap<>();
+
+        /** Context error handler. */
+        private final Consumer<Throwable> errHnd = (ex) -> err.compareAndSet(null, ex);
+
+        /** Stop condition checker. */
+        private final BooleanSupplier stopChecker = () -> err.get() != null;
+
+        /** Progress of processing cache group partitions on the local node.*/
+        private final Map<Integer, Set<PartitionRestoreFuture>> locProgress = new HashMap<>();
+
+        /**
+         * The stop future responsible for stopping cache groups during the rollback phase. Will be completed when the rollback
+         * process executes and all the cache group stop actions completes (the processCacheStopRequestOnExchangeDone finishes
+         * successfully and all the data deleted from disk).
+         */
+        private final GridFutureAdapter<Void> locStopCachesCompleteFut = new GridFutureAdapter<>();
+
+        /** Calculated affinity assignment cache per each cache group. */
+        private final Map<String, GridAffinityAssignmentCache> affCache = new ConcurrentHashMap<>();
+
         /** Cache ID to configuration mapping. */
         private volatile Map<Integer, StoredCacheData> cfgs;
 
@@ -1007,17 +1392,90 @@ public class SnapshotRestoreProcess {
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** Snapshot operation prepare response. */
+    private static class SnapshotRestoreOperationResponse implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache configurations on local node. */
+        private final List<StoredCacheData> ccfgs;
+
+        /** Snapshot metadata files on local node. */
+        private final List<SnapshotMetadata> metas;
+
+        /**
+         * @param ccfgs Cache configurations on local node.
+         * @param metas Snapshot metadata files on local node.
+         */
+        public SnapshotRestoreOperationResponse(
+            Collection<StoredCacheData> ccfgs,
+            Collection<SnapshotMetadata> metas
+        ) {
+            this.ccfgs = new ArrayList<>(ccfgs);
+            this.metas = new ArrayList<>(metas);
+        }
+    }
+
+    /** Future will be completed when partition processing ends. */
+    private static class PartitionRestoreFuture extends CompletableFuture<Path> {
+        /** Partition id. */
+        private final int partId;
+
+        /**
+         * @param partId Partition id.
+         */
+        private PartitionRestoreFuture(int partId) {
+            this.partId = partId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PartitionRestoreFuture future = (PartitionRestoreFuture)o;
+
+            return partId == future.partId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(partId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PartitionRestoreFuture.class, this);
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 730b9e1..d91b5bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -453,6 +453,11 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
         RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
 
         /**
+         * Cache group restore preload phase.
+         */
+        RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD,
+
+        /**
          * Cache group restore cache start phase.
          */
         RESTORE_CACHE_GROUP_SNAPSHOT_START,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 77de5cd..e7c06a1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -716,7 +716,7 @@ public class GridFunc {
      *
      * @return Closure which converts node to node ID.
      */
-    public static IgniteClosure<ClusterNode, UUID> node2id() {
+    public static IgniteClosure<? super ClusterNode, UUID> node2id() {
         return NODE2ID;
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 93ef0e2..82847cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -37,6 +37,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.encryption.AbstractEncryptionTest;
@@ -358,9 +360,32 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
         Function<Integer, V> factory,
         CacheConfiguration<Integer, V>... ccfgs
     ) throws Exception {
-        for (int g = 0; g < grids; g++)
-            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(g))
-                .setCacheConfiguration(ccfgs)));
+        return startGridsWithCache(grids, keys, factory, (id, cfg) -> cfg.getWorkDirectory(), ccfgs);
+    }
+
+    /**
+     * @param grids Number of ignite instances to start.
+     * @param keys Number of keys to create.
+     * @param factory Factory which produces values.
+     * @param <V> Cache value type.
+     * @return Ignite coordinator instance.
+     * @throws Exception If fails.
+     */
+    protected <V> IgniteEx startGridsWithCache(
+        int grids,
+        int keys,
+        Function<Integer, V> factory,
+        BiFunction<Integer, IgniteConfiguration, String> newWorkDir,
+        CacheConfiguration<Integer, V>... ccfgs
+    ) throws Exception {
+        for (int g = 0; g < grids; g++) {
+            IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(g))
+                .setCacheConfiguration(ccfgs));
+
+            cfg.setWorkDirectory(newWorkDir.apply(g, cfg));
+
+            startGrid(cfg);
+        }
 
         IgniteEx ig = grid(0);
 
@@ -519,32 +544,6 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param snpName Unique snapshot name.
-     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
-     * @param snpSndr Sender which used for snapshot sub-task processing.
-     * @return Future which will be completed when snapshot is done.
-     */
-    protected SnapshotFutureTask startLocalSnapshotTask(
-        GridCacheSharedContext<?, ?> cctx,
-        String snpName,
-        Map<Integer, Set<Integer>> parts,
-        SnapshotSender snpSndr
-    ) throws IgniteCheckedException {
-        SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, encryption, snpSndr);
-
-        snpFutTask.start();
-
-        // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
-        // due to checkpoint already running and we need to schedule the next one
-        // right after current will be completed.
-        cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
-
-        snpFutTask.started().get();
-
-        return snpFutTask;
-    }
-
-    /**
      * @param grids Grids to block snapshot executors.
      * @return Wrapped snapshot executor list.
      */
@@ -604,6 +603,39 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param snpName Unique snapshot name.
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param snpSndr Sender which used for snapshot sub-task processing.
+     * @return Future which will be completed when snapshot is done.
+     */
+    protected static IgniteInternalFuture<?> startLocalSnapshotTask(
+        GridCacheSharedContext<?, ?> cctx,
+        String snpName,
+        Map<Integer, Set<Integer>> parts,
+        boolean withMetaStorage,
+        SnapshotSender snpSndr
+    ) throws IgniteCheckedException {
+        AbstractSnapshotFutureTask<?> task = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts,
+            withMetaStorage, snpSndr);
+
+        if (!(task instanceof SnapshotFutureTask))
+            throw new IgniteCheckedException("Snapshot task hasn't been registered: " + task);
+
+        SnapshotFutureTask snpFutTask = (SnapshotFutureTask)task;
+
+        snpFutTask.start();
+
+        // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
+        // due to checkpoint already running and we need to schedule the next one
+        // right after current will be completed.
+        cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
+
+        snpFutTask.started().get();
+
+        return snpFutTask;
+    }
+
+    /**
      * @param ignite Ignite instance to resolve discovery spi to.
      * @return BlockingCustomMessageDiscoverySpi instance.
      */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index 49a3c62..c5b4ccb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -598,6 +598,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx,
             SNAPSHOT_NAME,
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            encryption,
             mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
 
         snpFut.get();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
index 579e9c3..a876f74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
@@ -21,12 +21,25 @@ import java.util.Collections;
 import java.util.function.Function;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.lang.IgniteFuture;
 import org.junit.runners.Parameterized;
 
 /**
  * Snapshot restore test base.
  */
 public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnapshotSelfTest {
+    /** Cache 1 name. */
+    protected static final String CACHE1 = "cache1";
+
+    /** Cache 2 name. */
+    protected static final String CACHE2 = "cache2";
+
+    /** Default shared cache group name. */
+    protected static final String SHARED_GRP = "shared";
+
     /** Parameters. Encrypted snapshots are not supported. */
     @Parameterized.Parameters(name = "Encryption is disabled")
     public static Iterable<Boolean> disabledEncryption() {
@@ -43,6 +56,29 @@ public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnaps
         return startGridsWithSnapshot(nodesCnt, keysCnt, false);
     }
 
+    /**
+     * @param spi Test communication spi.
+     * @param restorePhase The type of distributed process on which communication is blocked.
+     * @param grpName Cache group name.
+     * @return Snapshot restore future.
+     * @throws InterruptedException if interrupted.
+     */
+    protected IgniteFuture<Void> waitForBlockOnRestore(
+        TestRecordingCommunicationSpi spi,
+        DistributedProcess.DistributedProcessType restorePhase,
+        String grpName
+    ) throws InterruptedException {
+        spi.blockMessages((node, msg) ->
+            msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
+
+        IgniteFuture<Void> fut =
+            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
+
+        spi.waitForBlocked();
+
+        return fut;
+    }
+
     /** */
     protected class BinaryValueBuilder implements Function<Integer, Object> {
         /** Binary type name. */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index e6f93b5..b6361e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -59,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
 import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.IgniteSpiException;
@@ -74,6 +74,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
@@ -86,15 +87,6 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
     /** Type name used for binary and SQL. */
     private static final String TYPE_NAME = "CustomType";
 
-    /** Cache 1 name. */
-    private static final String CACHE1 = "cache1";
-
-    /** Cache 2 name. */
-    private static final String CACHE2 = "cache2";
-
-    /** Default shared cache group name. */
-    private static final String SHARED_GRP = "shared";
-
     /** Reset consistent ID flag. */
     private boolean resetConsistentId;
 
@@ -164,14 +156,25 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         awaitPartitionMapExchange();
 
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).record(SnapshotFilesRequestMessage.class);
+
         // Restore all cache groups.
         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
 
+        awaitPartitionMapExchange(true, true, null, true);
+
         assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
         assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
         assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
 
         waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+        // Ensure there is no remote snapshot requests occurred.
+        for (Ignite g : G.allGrids()) {
+            assertTrue("Snapshot files remote requests must not happened due to all the files are available locally",
+                TestRecordingCommunicationSpi.spi(g).recordedMessages(true).isEmpty());
+        }
     }
 
     /** @throws Exception If failed. */
@@ -364,14 +367,10 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         resetBaselineTopology();
 
-        IgniteFuture<Void> fut =
-            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
 
-        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null);
-
-        ensureCacheAbsent(dfltCacheCfg);
-
-        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
+        assertCacheKeys(grid(0).cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
     }
 
     /** @throws Exception If failed. */
@@ -507,7 +506,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
      */
     @Test
     public void testParallelCacheStartWithTheSameNameOnPrepare() throws Exception {
-        checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, IgniteCheckedException.class,
+        checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, IgniteCheckedException.class,
             "Cache start failed. A cache or group with the same name is currently being restored from a snapshot");
     }
 
@@ -555,7 +554,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(3));
 
-        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, DEFAULT_CACHE_NAME);
 
         IgniteInternalFuture<?> fut0 = runAsync(() -> stopGrid(3, true));
 
@@ -574,12 +573,15 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
 
-        GridTestUtils.assertThrowsAnyCause(
-            log,
-            () -> startGrid(3),
-            IgniteSpiException.class,
-            "to add the node to cluster - remove directories with the caches"
-        );
+        dfltCacheCfg = null;
+
+        // Should start successfully.
+        Ignite ignite = startGrid(3);
+
+        resetBaselineTopology();
+        awaitPartitionMapExchange();
+
+        assertNull(ignite.cache(DEFAULT_CACHE_NAME));
     }
 
     /** @throws Exception If failed. */
@@ -594,7 +596,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
         CountDownLatch stopLatch = new CountDownLatch(1);
 
         spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage &&
-            ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
+            ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD.ordinal());
 
         String failingFilePath = Paths.get(CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME,
             PART_FILE_PREFIX + (dfltCacheCfg.getAffinity().partitions() / 2) + FILE_SUFFIX).toString();
@@ -760,29 +762,6 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
     }
 
     /**
-     * @param spi Test communication spi.
-     * @param restorePhase The type of distributed process on which communication is blocked.
-     * @param grpName Cache group name.
-     * @return Snapshot restore future.
-     * @throws InterruptedException if interrupted.
-     */
-    private IgniteFuture<Void> waitForBlockOnRestore(
-        TestRecordingCommunicationSpi spi,
-        DistributedProcessType restorePhase,
-        String grpName
-    ) throws InterruptedException {
-        spi.blockMessages((node, msg) ->
-            msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
-
-        IgniteFuture<Void> fut =
-            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
-
-        spi.waitForBlocked();
-
-        return fut;
-    }
-
-    /**
      * Custom I/O factory to preprocessing created files.
      */
     private static class CustomFileIOFactory implements FileIOFactory {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index 55468f4..dd83af1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -131,7 +131,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         }, 5, "cache-loader-");
 
         // Register task but not schedule it on the checkpoint.
-        SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
+        SnapshotFutureTask snpFutTask = (SnapshotFutureTask)mgr.registerSnapshotTask(SNAPSHOT_NAME,
             cctx.localNodeId(),
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
             encryption,
@@ -256,6 +256,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
             SNAPSHOT_NAME,
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            encryption,
             mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
 
         // Check the right exception thrown.
@@ -279,6 +280,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> fut = startLocalSnapshotTask(ig.context().cache().context(),
             SNAPSHOT_NAME,
             parts,
+            encryption,
             new DelegateSnapshotSender(log, mgr0.snapshotExecutorService(),
                 mgr0.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
                 @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
@@ -314,6 +316,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
             SNAPSHOT_NAME,
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            encryption,
             new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
                 @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
                     try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
new file mode 100644
index 0000000..8d78836
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRemoteRequestFromSingleNode() throws Exception {
+        int rqCnt = 10;
+
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), grid(1).localNode().id());
+
+        awaitPartitionMapExchange();
+
+        CountDownLatch latch = new CountDownLatch(parts.values().stream().mapToInt(Set::size).sum() * rqCnt);
+        GridCompoundFuture<Void, Void> compFut = new GridCompoundFuture<>();
+
+        IgniteInternalFuture<?> runFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                Map<Integer, Set<Integer>> parts0 = new HashMap<>();
+
+                for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                    parts0.computeIfAbsent(e.getKey(), k -> new HashSet<>()).addAll(e.getValue());
+
+                IgniteInternalFuture<Void> locFut = null;
+
+                compFut.add(locFut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+                    SNAPSHOT_NAME,
+                    parts,
+                    () -> false,
+                    defaultPartitionConsumer(parts0, latch)));
+
+                locFut.listen(f -> assertEquals("All partitions must be handled: " + parts0,
+                    F.size(parts0.values(), Set::isEmpty), parts0.size()));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }, rqCnt, "rq-creator-");
+
+        runFut.get(TIMEOUT);
+        U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+        compFut.markInitialized().get(TIMEOUT);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRemoteRequestEachOther() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        IgniteSnapshotManager mgr0 = snp(ignite);
+        IgniteSnapshotManager mgr1 = snp(grid(1));
+
+        UUID node0 = grid(0).localNode().id();
+        UUID node1 = grid(1).localNode().id();
+
+        Map<Integer, Set<Integer>> fromNode1 = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), node1);
+        Map<Integer, Set<Integer>> fromNode0 = owningParts(grid(1), CU.cacheId(DEFAULT_CACHE_NAME), node0);
+
+        G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g)
+            .blockMessages((n, msg) -> msg instanceof SnapshotFilesRequestMessage));
+
+        CountDownLatch latch = new CountDownLatch(fromNode1.values().stream().mapToInt(Set::size).sum() +
+            fromNode0.values().stream().mapToInt(Set::size).sum());
+
+        // Snapshot must be taken on node1 and transmitted to node0.
+        IgniteInternalFuture<?> futFrom1To0 = mgr0.requestRemoteSnapshotFiles(node1, SNAPSHOT_NAME, fromNode1, () -> false,
+            defaultPartitionConsumer(fromNode1, latch));
+        IgniteInternalFuture<?> futFrom0To1 = mgr1.requestRemoteSnapshotFiles(node0, SNAPSHOT_NAME, fromNode0, () -> false,
+            defaultPartitionConsumer(fromNode0, latch));
+
+        G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g).stopBlock());
+
+        latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+        futFrom0To1.get(TIMEOUT);
+        futFrom1To0.get(TIMEOUT);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testRemoteRequestedInitiatorNodeLeft() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        awaitPartitionMapExchange();
+
+        IgniteSnapshotManager mgr1 = snp(grid(1));
+        UUID rmtNodeId = grid(1).localNode().id();
+        UUID locNodeId = grid(0).localNode().id();
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), rmtNodeId);
+
+        CountDownLatch sndLatch = new CountDownLatch(1);
+
+        mgr1.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
+            @Override public SnapshotSender apply(String s, UUID uuid) {
+                return new DelegateSnapshotSender(log, mgr1.snapshotExecutorService(), mgr1.remoteSnapshotSenderFactory(s, uuid)) {
+                    @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                        if (partId(part.getName()) > 0) {
+                            try {
+                                sndLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+                            }
+                            catch (Exception e) {
+                                throw new IgniteException(e);
+                            }
+                        }
+
+                        super.sendPart0(part, cacheDirName, pair, length);
+                    }
+                };
+            }
+        });
+
+        snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+            SNAPSHOT_NAME,
+            parts,
+            () -> false,
+            (part, t) -> {
+            });
+
+        IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
+
+        assertTrue(waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                IgniteInternalFuture<?> snpFut = snp(grid(1)).lastScheduledSnapshotResponseRemoteTask(locNodeId);
+
+                if (snpFut == null)
+                    return false;
+                else {
+                    futs[0] = snpFut;
+
+                    return true;
+                }
+            }
+        }, 5_000L));
+
+        stopGrid(0);
+        sndLatch.countDown();
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> futs[0].get(TIMEOUT), ClusterTopologyCheckedException.class, null);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRequestRemoteSourceNodeLeft() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
+            grid(1).localNode().id());
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+            SNAPSHOT_NAME,
+            parts,
+            () -> false,
+            (part, t) -> {
+                if (t == null) {
+                    int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+
+                    assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
+                    assertTrue("Received partition has not been requested",
+                        parts.get(grpId).contains(partId(part.getName())));
+
+                    try {
+                        U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+                else {
+                    assertTrue(t instanceof ClusterTopologyCheckedException);
+                    assertNull(part);
+                }
+            });
+
+        stopGrid(1);
+
+        latch.countDown();
+
+        assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class,
+            "he node from which a snapshot has been requested left the grid");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRequestRemoteCancel() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
+            grid(1).localNode().id());
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean stopChecker = new AtomicBoolean();
+
+        IgniteInternalFuture<Void> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+            SNAPSHOT_NAME,
+            parts,
+            stopChecker::get,
+            (part, t) -> {
+                try {
+                    U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            });
+
+        IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
+
+        assertTrue(waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                IgniteInternalFuture<?> snpFut = snp(grid(1))
+                    .lastScheduledSnapshotResponseRemoteTask(grid(0).localNode().id());
+
+                if (snpFut == null)
+                    return false;
+                else {
+                    futs[0] = snpFut;
+
+                    return true;
+                }
+            }
+        }, 5_000L));
+
+        stopChecker.set(true);
+        latch.countDown();
+
+        assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), TransmissionCancelledException.class,
+            "Future cancelled prior to the all requested partitions processed");
+    }
+
+    /**
+     * @param parts Expected partitions.
+     * @param latch Latch to await partitions processed.
+     * @return Consumer.
+     */
+    private static BiConsumer<File, Throwable> defaultPartitionConsumer(Map<Integer, Set<Integer>> parts, CountDownLatch latch) {
+        return (part, t) -> {
+            assertNull(t);
+
+            int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+
+            assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
+            assertTrue("Received partition has not been requested",
+                parts.get(grpId).remove(partId(part.getName())));
+
+            latch.countDown();
+        };
+    }
+
+    /**
+     * @param src Source node to calculate.
+     * @param grpId Group id to collect OWNING partitions.
+     * @param rmtNodeId Remote node id.
+     * @return Map of collected parts.
+     */
+    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, int grpId, UUID rmtNodeId) {
+        return Collections.singletonMap(grpId, src.context()
+            .cache()
+            .cacheGroup(grpId)
+            .topology()
+            .partitions(rmtNodeId)
+            .entrySet()
+            .stream()
+            .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet()));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
new file mode 100644
index 0000000..2c532fd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** */
+    private static final String FIRST_CLUSTER_PREFIX = "one_";
+
+    /** */
+    private static final String SECOND_CLUSTER_PREFIX = "two_";
+
+    /** */
+    private static final String CACHE_WITH_NODE_FILTER = "cacheWithFilter";
+
+    /** Node filter filter test restoring on some nodes only. */
+    private static final IgnitePredicate<ClusterNode> ZERO_SUFFIX_NODE_FILTER = new IgnitePredicate<ClusterNode>() {
+        @Override public boolean apply(ClusterNode node) {
+            return node.consistentId().toString().endsWith("0");
+        }
+    };
+
+    /** {@code true} if snapshot parts has been initialized on test-class startup. */
+    private static boolean inited;
+
+    /** Snapshot parts on dedicated cluster. Each part has its own local directory. */
+    private static final Set<Path> snpParts = new HashSet<>();
+
+    /** */
+    private static final Function<String, BiFunction<Integer, IgniteConfiguration, String>> CLUSTER_DIR =
+        new Function<String, BiFunction<Integer, IgniteConfiguration, String>>() {
+            @Override public BiFunction<Integer, IgniteConfiguration, String> apply(String prefix) {
+                return (id, cfg) -> Paths.get(defaultWorkDirectory().toString(),
+                    prefix + U.maskForFileName(cfg.getIgniteInstanceName())).toString();
+            }
+        };
+
+    /** Cache value builder. */
+    private final Function<Integer, Object> valBuilder = String::valueOf;
+
+    /** @throws Exception If fails. */
+    @Before
+    public void prepareDedicatedSnapshot() throws Exception {
+        if (!inited) {
+            cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+
+            CacheConfiguration<Integer, Object> cacheCfg1 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+            CacheConfiguration<Integer, Object> cacheCfg2 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+            CacheConfiguration<Integer, Object> cacheCfg3 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE_WITH_NODE_FILTER))
+                    .setBackups(1)
+                    .setAffinity(new RendezvousAffinityFunction(false, 16))
+                    .setNodeFilter(ZERO_SUFFIX_NODE_FILTER);
+
+            IgniteEx ignite = startDedicatedGridsWithCache(FIRST_CLUSTER_PREFIX, 6, CACHE_KEYS_RANGE, valBuilder,
+                dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2, cacheCfg3);
+
+            ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+            awaitPartitionMapExchange();
+            stopAllGrids();
+
+            snpParts.addAll(findSnapshotParts(FIRST_CLUSTER_PREFIX, SNAPSHOT_NAME));
+
+            inited = true;
+        }
+
+        beforeTestSnapshot();
+        cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+    }
+
+    /** @throws Exception If fails. */
+    @After
+    public void afterSwitchSnapshot() throws Exception {
+        afterTestSnapshot();
+        cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+    }
+
+    /** */
+    @AfterClass
+    public static void cleanupSnapshot() {
+        snpParts.forEach(U::delete);
+        cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreAllGroups() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).record(SnapshotFilesRequestMessage.class);
+
+        // Restore all cache groups.
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+        awaitPartitionMapExchange(true, true, null, true);
+
+        assertCacheKeys(scc.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        assertCacheKeys(scc.cache(CACHE1), CACHE_KEYS_RANGE);
+        assertCacheKeys(scc.cache(CACHE2), CACHE_KEYS_RANGE);
+
+        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+        List<Object> msgs = new ArrayList<>();
+
+        for (Ignite g : G.allGrids())
+            msgs.addAll(TestRecordingCommunicationSpi.spi(g).recordedMessages(true));
+
+        assertPartitionsDuplicates(msgs);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreNoRebalance() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).record(GridDhtPartitionDemandMessage.class);
+
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(CACHE_WITH_NODE_FILTER)).get(TIMEOUT);
+
+        awaitPartitionMapExchange(true, true, null, true);
+
+        assertCacheKeys(scc.cache(CACHE_WITH_NODE_FILTER), CACHE_KEYS_RANGE);
+        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+        for (Ignite g : G.allGrids())
+            assertTrue(TestRecordingCommunicationSpi.spi(g).recordedMessages(true).isEmpty());
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        IgniteSnapshotManager mgr = snp(grid(1));
+        mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
+            @Override public SnapshotSender apply(String s, UUID uuid) {
+                return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
+                    @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                        if (partId(part.getName()) > 0)
+                            throw new IgniteException("Test exception. Uploading partition file failed: " + pair);
+
+                        super.sendPart0(part, cacheDirName, pair, length);
+                    }
+                };
+            }
+        });
+
+        IgniteFuture<?> fut = grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            () -> fut.get(TIMEOUT),
+            IgniteException.class,
+            "Test exception. Uploading partition file failed");
+        assertNull(scc.cache(DEFAULT_CACHE_NAME));
+        ensureCacheAbsent(dfltCacheCfg);
+    }
+
+    /**
+     * @param snpParts Snapshot parts.
+     * @param toNodes List of toNodes to copy parts to.
+     */
+    private static void copyAndShuffle(Set<Path> snpParts, List<Ignite> toNodes) {
+        AtomicInteger cnt = new AtomicInteger();
+
+        snpParts.forEach(p -> {
+            try {
+                IgniteEx loc = (IgniteEx)toNodes.get(cnt.getAndIncrement() % toNodes.size());
+                String snpName = p.getFileName().toString();
+
+                U.copy(p.toFile(),
+                    Paths.get(resolveSnapshotWorkDirectory(loc.configuration()).getAbsolutePath(), snpName).toFile(),
+                    false);
+            }
+            catch (IOException e) {
+                throw new IgniteException(e);
+            }
+        });
+    }
+
+    /**
+     * @param clusterPrefix Array of prefixes to clean up directories.
+     */
+    private static void cleanupDedicatedPersistenceDirs(String... clusterPrefix) {
+        for (String prefix : clusterPrefix) {
+            try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
+                path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
+            ) {
+                for (Path dir : ds)
+                    U.delete(dir);
+            }
+            catch (IOException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+    /**
+     * @return Collection of dedicated snapshot paths located in Ignite working directory.
+     */
+    private static Set<Path> findSnapshotParts(String prefix, String snpName) {
+        Set<Path> snpPaths = new HashSet<>();
+
+        try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
+            path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
+        ) {
+            for (Path dir : ds)
+                snpPaths.add(searchDirectoryRecursively(dir, snpName)
+                    .orElseThrow(() -> new IgniteException("Snapshot not found in the Ignite work directory " +
+                        "[dir=" + dir.toString() + ", snpName=" + snpName + ']')));
+
+            return snpPaths;
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param grids Number of ignite instances to start.
+     * @param keys Number of keys to create.
+     * @param valMapper Factory which produces values.
+     * @param <V> Cache value type.
+     * @return Ignite coordinator instance.
+     * @throws Exception If fails.
+     */
+    private <V> IgniteEx startDedicatedGridsWithCache(
+        String prefix,
+        int grids,
+        int keys,
+        Function<Integer, V> valMapper,
+        CacheConfiguration<Integer, V>... ccfgs
+    ) throws Exception {
+        return startGridsWithCache(grids,
+            keys,
+            valMapper,
+            CLUSTER_DIR.apply(prefix),
+            ccfgs);
+    }
+
+    /**
+     * @param grids Number of ignite instances to start.
+     * @return Ignite coordinator instance.
+     * @throws Exception If fails.
+     */
+    private IgniteEx startDedicatedGrids(String prefix, int grids) throws Exception {
+        for (int g = 0; g < grids; g++)
+            startDedicatedGrid(prefix, g);
+
+        grid(0).events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT);
+
+        return grid(0);
+    }
+
+    /**
+     * @param prefix Grid work directory prefix.
+     * @param id Grid index.
+     * @return Grid instance.
+     * @throws Exception If fails.
+     */
+    private IgniteEx startDedicatedGrid(String prefix, int id) throws Exception {
+        IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(id)));
+        cfg.setWorkDirectory(CLUSTER_DIR.apply(prefix).apply(id, cfg));
+
+        return startGrid(cfg);
+    }
+
+    /**
+     * @return Default work directory.
+     */
+    private static Path defaultWorkDirectory() {
+        try {
+            return Paths.get(U.defaultWorkDirectory());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private static void assertPartitionsDuplicates(List<Object> msgs) {
+        List<GroupPartitionId> all = new ArrayList<>();
+
+        for (Object o : msgs) {
+            SnapshotFilesRequestMessage msg0 = (SnapshotFilesRequestMessage)o;
+            Map<Integer, Set<Integer>> parts = msg0.parts();
+
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+                for (Integer partId : e.getValue())
+                    all.add(new GroupPartitionId(e.getKey(), partId));
+            }
+        }
+
+        assertEquals(all.size(), new HashSet<>(all).size());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 402f2f7..12dc32d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -47,6 +47,8 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCl
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
 import org.apache.ignite.internal.processors.performancestatistics.CacheStartTest;
 import org.apache.ignite.internal.processors.performancestatistics.CheckpointTest;
@@ -101,12 +103,14 @@ import org.junit.runners.Suite;
 
     IgniteSnapshotManagerSelfTest.class,
     IgniteClusterSnapshotSelfTest.class,
+    IgniteSnapshotRemoteRequestTest.class,
     IgniteClusterSnapshotCheckTest.class,
     IgniteSnapshotWithMetastorageTest.class,
     IgniteSnapshotMXBeanTest.class,
     IgniteClusterSnapshotRestoreSelfTest.class,
     IgniteClusterSnapshotHandlerTest.class,
     EncryptedSnapshotTest.class,
+    IgniteSnapshotRestoreFromRemoteTest.class,
 
     IgniteClusterIdTagTest.class,
     FullyConnectedComponentSearcherTest.class,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
index 11f5f8c..7db3ff6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
@@ -35,11 +35,14 @@ import org.apache.ignite.events.SnapshotEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static java.util.Optional.ofNullable;
 import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
 import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
 import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
@@ -54,9 +57,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
     /** Number of cache keys to pre-create at node start. */
     private static final int CACHE_KEYS_RANGE = 10_000;
 
-    /** Cache value builder. */
-    private Function<Integer, Object> valBuilder = new BinaryValueBuilder(TYPE_NAME);
-
     /** {@inheritDoc} */
     @Override protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
         return super.txCacheConfig(ccfg).setSqlIndexMaxInlineSize(255).setSqlSchema("PUBLIC")
@@ -67,11 +67,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
                 .setIndexes(Collections.singletonList(new QueryIndex("id")))));
     }
 
-    /** {@inheritDoc} */
-    @Override protected Function<Integer, Object> valueBuilder() {
-        return valBuilder;
-    }
-
     /** @throws Exception If failed. */
     @Test
     public void testBasicClusterSnapshotRestore() throws Exception {
@@ -82,6 +77,7 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
 
         assertCacheKeys(client.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        assertRebuildIndexes(client.cache(DEFAULT_CACHE_NAME), false);
 
         waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
     }
@@ -89,6 +85,8 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
     /** @throws Exception If failed. */
     @Test
     public void testBasicClusterSnapshotRestoreWithMetadata() throws Exception {
+        valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
         IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
 
         // Remove metadata.
@@ -101,6 +99,7 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
         ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
 
         assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+        assertRebuildIndexes(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), false);
 
         for (Ignite grid : G.allGrids())
             assertNotNull(((IgniteEx)grid).context().cacheObjects().metadata(typeId));
@@ -111,9 +110,11 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
     /** @throws Exception If failed. */
     @Test
     public void testClusterSnapshotRestoreOnBiggerTopology() throws Exception {
+        valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
         int nodesCnt = 4;
 
-        startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
+        startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
 
         grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
 
@@ -146,7 +147,11 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
 
         awaitPartitionMapExchange();
 
-        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+        for (Ignite g : G.allGrids())
+            ofNullable(indexRebuildFuture((IgniteEx)g, CU.cacheId(DEFAULT_CACHE_NAME))).orElse(new GridFinishedFuture<>()).get(TIMEOUT);
+
+        for (Ignite g : G.allGrids())
+            assertCacheKeys(g.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
 
         GridTestUtils.waitForCondition(() -> evts.size() == 2, TIMEOUT);
         assertEquals(2, evts.size());
@@ -168,12 +173,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
 
             String nodeId = ctx.localNodeId().toString();
 
-            assertTrue("nodeId=" + nodeId, grid.cache(cache.getName()).indexReadyFuture().isDone());
-
-            // Make sure no index rebuild happened.
-            assertEquals("nodeId=" + nodeId,
-                0, ctx.cache().cache(cache.getName()).context().cache().metrics0().getIndexRebuildKeysProcessed());
-
             GridQueryProcessor qry = ((IgniteEx)grid).context().query();
 
             // Make sure  SQL works fine.
@@ -188,6 +187,23 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
         }
     }
 
+    /**
+     * @param cache Ignite cache.
+     * @param rebuild Rebuild index happened.
+     */
+    private void assertRebuildIndexes(IgniteCache<Object, Object> cache, boolean rebuild) {
+        for (Ignite grid : G.allGrids()) {
+            GridKernalContext ctx = ((IgniteEx)grid).context();
+
+            assertTrue("nodeId=" + ctx.localNodeId(), grid.cache(cache.getName()).indexReadyFuture().isDone());
+
+            // Make sure no index rebuild happened.
+            assertEquals("nodeId=" + ctx.localNodeId(),
+                rebuild, ctx.cache().cache(cache.getName()).context().cache().metrics0()
+                    .getIndexRebuildKeysProcessed() > 0);
+        }
+    }
+
     /** */
     private static class IndexedValueBuilder implements Function<Integer, Object> {
         /** {@inheritDoc} */