You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/11/22 07:39:55 UTC

[ignite] branch ignite-2.12 updated (93f6fc8 -> 0a7ccfa)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a change to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git.


    from 93f6fc8  IGNITE-15954 .NET: Fix dynamic assemblies handling in TypeResolver (#9574)
     new f324abc  IGNITE-15802 Remove duplicated code for affinity cache (#9519)
     new 0a7ccfa  IGNITE-14744 Restore snapshot taken on different topologies (#9539)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../DurableBackgroundCleanupIndexTreeTask.java     |   1 +
 .../communication/AbstractTransmission.java        |   4 +-
 .../managers/communication/FileReceiver.java       |   3 +-
 .../managers/communication/GridIoManager.java      |  28 +-
 .../communication/GridIoMessageFactory.java        |   4 +
 .../communication/TransmissionHandler.java         |   7 +-
 .../managers/discovery/GridDiscoveryManager.java   |   7 +-
 .../affinity/GridAffinityAssignmentCache.java      |  22 +-
 .../cache/CacheAffinitySharedManager.java          |  97 +--
 .../processors/cache/CacheGroupContext.java        |  23 +-
 .../processors/cache/ClusterCachesInfo.java        |   8 +-
 .../internal/processors/cache/ExchangeActions.java |   4 +-
 .../processors/cache/GridCacheContextInfo.java     |   2 +-
 .../internal/processors/cache/GridCacheUtils.java  |  16 +
 .../persistence/file/FilePageStoreManager.java     |  32 +-
 .../snapshot/AbstractSnapshotFutureTask.java       | 136 +++
 .../snapshot/AbstractSnapshotMessage.java}         |  73 +-
 .../snapshot/IgniteSnapshotManager.java            | 872 +++++++++++++++++--
 .../snapshot/IgniteSnapshotVerifyException.java    |   3 +
 .../snapshot/SnapshotFilesFailureMessage.java}     |  76 +-
 .../snapshot/SnapshotFilesRequestMessage.java      | 174 ++++
 .../SnapshotFinishedFutureTask.java}               |  30 +-
 .../persistence/snapshot/SnapshotFutureTask.java   | 117 +--
 .../persistence/snapshot/SnapshotMetadata.java     |  57 +-
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |   9 +-
 .../snapshot/SnapshotResponseRemoteFutureTask.java | 172 ++++
 .../snapshot/SnapshotRestoreProcess.java           | 962 +++++++++++++++------
 .../util/distributed/DistributedProcess.java       |   5 +
 .../apache/ignite/internal/util/lang/GridFunc.java |   2 +-
 .../snapshot/AbstractSnapshotSelfTest.java         |  90 +-
 .../snapshot/IgniteClusterSnapshotCheckTest.java   |   1 +
 .../IgniteClusterSnapshotRestoreBaseTest.java      |  36 +
 .../IgniteClusterSnapshotRestoreSelfTest.java      |  77 +-
 .../snapshot/IgniteSnapshotManagerSelfTest.java    |   5 +-
 .../snapshot/IgniteSnapshotRemoteRequestTest.java  | 336 +++++++
 .../IgniteSnapshotRestoreFromRemoteTest.java       | 376 ++++++++
 .../IgniteBasicWithPersistenceTestSuite.java       |   4 +
 ...niteClusterSnapshotRestoreWithIndexingTest.java |  48 +-
 38 files changed, 3204 insertions(+), 715 deletions(-)
 create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
 copy modules/core/src/main/java/org/apache/ignite/{spi/collision/jobstealing/JobStealingRequest.java => internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java} (62%)
 copy modules/core/src/main/java/org/apache/ignite/internal/processors/cache/{distributed/near/GridNearUnlockRequest.java => persistence/snapshot/SnapshotFilesFailureMessage.java} (56%)
 create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
 copy modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/{metastorage/MetastorageSearchRow.java => snapshot/SnapshotFinishedFutureTask.java} (65%)
 create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
 create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
 create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java

[ignite] 01/02: IGNITE-15802 Remove duplicated code for affinity cache (#9519)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit f324abc452ae130adf9a974201170d63a5878dc8
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Tue Nov 2 23:32:18 2021 +0300

    IGNITE-15802 Remove duplicated code for affinity cache (#9519)
---
 .../DurableBackgroundCleanupIndexTreeTask.java     |  1 +
 .../managers/discovery/GridDiscoveryManager.java   |  7 +-
 .../affinity/GridAffinityAssignmentCache.java      | 22 ++++-
 .../cache/CacheAffinitySharedManager.java          | 97 ++++------------------
 .../processors/cache/CacheGroupContext.java        | 21 +----
 .../processors/cache/ClusterCachesInfo.java        |  4 +-
 .../processors/cache/GridCacheContextInfo.java     |  2 +-
 .../internal/processors/cache/GridCacheUtils.java  | 16 ++++
 .../persistence/file/FilePageStoreManager.java     |  2 +-
 9 files changed, 59 insertions(+), 113 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
index e568dad..46134d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
@@ -54,6 +54,7 @@ import static org.apache.ignite.internal.metric.IoStatisticsType.SORTED_INDEX;
  *
  * @deprecated Use {@link DurableBackgroundCleanupIndexTreeTaskV2}.
  */
+@Deprecated
 public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundTask {
     /** */
     private static final long serialVersionUID = 0L;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 007dfed..0041084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -3602,12 +3602,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     if (grpAff.persistentCacheGrp && bltNodes != null && !bltNodes.contains(node.id())) // Filter out.
                         continue;
 
-                    List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId);
-
-                    if (nodes == null)
-                        cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>());
-
-                    nodes.add(node);
+                    cacheGrpAffNodes.computeIfAbsent(grpId, id -> new ArrayList<>()).add(node);
                 }
             }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 78d3b86..c8d5da3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -38,6 +38,7 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -59,6 +60,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_S
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD;
 import static org.apache.ignite.IgniteSystemProperties.getFloat;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 
@@ -163,7 +165,7 @@ public class GridAffinityAssignmentCache {
      * @param backups Number of backups.
      * @param locCache Local cache flag.
      */
-    public GridAffinityAssignmentCache(GridKernalContext ctx,
+    private GridAffinityAssignmentCache(GridKernalContext ctx,
         String cacheOrGrpName,
         int grpId,
         AffinityFunction aff,
@@ -196,6 +198,22 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @param ctx Kernal context.
+     * @param aff Initialized affinity function.
+     * @param ccfg Cache configuration.
+     * @return Affinity assignment cache instance.
+     */
+    public static GridAffinityAssignmentCache create(GridKernalContext ctx, AffinityFunction aff, CacheConfiguration<?, ?> ccfg) {
+        return new GridAffinityAssignmentCache(ctx,
+            CU.cacheOrGroupName(ccfg),
+            CU.cacheGroupId(ccfg),
+            aff,
+            ccfg.getNodeFilter(),
+            ccfg.getBackups(),
+            ccfg.getCacheMode() == LOCAL);
+    }
+
+    /**
      * @return Key to find caches with similar affinity.
      */
     public Object similarAffinityKey() {
@@ -338,7 +356,7 @@ public class GridAffinityAssignmentCache {
         if (!locCache) {
             sorted = new ArrayList<>(discoCache.cacheGroupAffinityNodes(groupId()));
 
-            Collections.sort(sorted, NodeOrderComparator.getInstance());
+            sorted.sort(NodeOrderComparator.getInstance());
         }
         else
             sorted = Collections.singletonList(ctx.discovery().localNode());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 706d7e6..bd2c4f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -658,7 +658,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 assert grpHolder != null && !grpHolder.nonAffNode() : grpHolder;
 
                 try {
-                    grpHolder = createHolder(
+                    grpHolder = CacheGroupNoAffOrFilteredHolder.create(
                         cctx,
                         cachesRegistry.group(grpId),
                         topVer,
@@ -1984,7 +1984,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                 if (grp == null) {
-                    grpHolder = createHolder(cctx, desc, topVer, null);
+                    grpHolder = CacheGroupNoAffOrFilteredHolder.create(cctx, desc, topVer, null);
 
                     final GridAffinityAssignmentCache aff = grpHolder.affinity();
 
@@ -2135,9 +2135,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert (affNode && grp != null) || (!affNode && grp == null);
 
-        CacheGroupHolder cacheGrp = affNode ?
-            new CacheGroupAffNodeHolder(grp) :
-            createHolder(cctx, desc, topVer, null);
+        CacheGroupHolder cacheGrp = affNode ? new CacheGroupAffNodeHolder(grp) :
+            CacheGroupNoAffOrFilteredHolder.create(cctx, desc, topVer, null);
 
         CacheGroupHolder old = grpHolders.put(desc.groupId(), cacheGrp);
 
@@ -2642,7 +2641,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      *
      */
-    abstract class CacheGroupHolder {
+    private abstract static class CacheGroupHolder {
         /** */
         private final GridAffinityAssignmentCache aff;
 
@@ -2656,7 +2655,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          */
         CacheGroupHolder(boolean rebalanceEnabled,
             GridAffinityAssignmentCache aff,
-            @Nullable GridAffinityAssignmentCache initAff) {
+            @Nullable GridAffinityAssignmentCache initAff
+        ) {
             this.aff = aff;
 
             if (initAff != null)
@@ -2678,13 +2678,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /**
-         * @return Partitions number.
-         */
-        int partitions() {
-            return aff.partitions();
-        }
-
-        /**
          * @param discoCache Discovery data cache.
          * @return Cache topology.
          */
@@ -2701,7 +2694,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Created cache is started on coordinator.
      */
-    private class CacheGroupAffNodeHolder extends CacheGroupHolder {
+    private static class CacheGroupAffNodeHolder extends CacheGroupHolder {
         /** */
         private final CacheGroupContext grp;
 
@@ -2738,9 +2731,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Created if cache is not started on coordinator.
      */
-    private class CacheGroupNoAffOrFilteredHolder extends CacheGroupHolder {
+    private static class CacheGroupNoAffOrFilteredHolder extends CacheGroupHolder {
         /** */
-        private final GridCacheSharedContext cctx;
+        private final GridCacheSharedContext<?, ?> cctx;
 
         /**
          * @param rebalanceEnabled Rebalance flag.
@@ -2750,7 +2743,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          */
         CacheGroupNoAffOrFilteredHolder(
             boolean rebalanceEnabled,
-            GridCacheSharedContext cctx,
+            GridCacheSharedContext<?, ?> cctx,
             GridAffinityAssignmentCache aff,
             @Nullable GridAffinityAssignmentCache initAff
         ) {
@@ -2763,27 +2756,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param cctx Context.
          * @param grpDesc Cache group descriptor.
          * @param topVer Current exchange version.
-         * @return Cache holder.
-         * @throws IgniteCheckedException If failed.
-         */
-        CacheGroupNoAffOrFilteredHolder create(
-            GridCacheSharedContext cctx,
-            CacheGroupDescriptor grpDesc,
-            AffinityTopologyVersion topVer
-        ) throws IgniteCheckedException {
-            return create(cctx, grpDesc, topVer, null);
-        }
-
-        /**
-         * @param cctx Context.
-         * @param grpDesc Cache group descriptor.
-         * @param topVer Current exchange version.
          * @param initAff Current affinity.
          * @return Cache holder.
          * @throws IgniteCheckedException If failed.
          */
-        CacheGroupNoAffOrFilteredHolder create(
-            GridCacheSharedContext cctx,
+        static CacheGroupNoAffOrFilteredHolder create(
+            GridCacheSharedContext<?, ?> cctx,
             CacheGroupDescriptor grpDesc,
             AffinityTopologyVersion topVer,
             @Nullable GridAffinityAssignmentCache initAff
@@ -2806,16 +2784,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             U.startLifecycleAware(F.asList(affFunc));
 
-            GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
-                grpDesc.cacheOrGroupName(),
-                grpDesc.groupId(),
-                affFunc,
-                ccfg.getNodeFilter(),
-                ccfg.getBackups(),
-                ccfg.getCacheMode() == LOCAL
-            );
-
-            return new CacheGroupNoAffOrFilteredHolder(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
+            return new CacheGroupNoAffOrFilteredHolder(ccfg.getRebalanceMode() != NONE, cctx,
+                GridAffinityAssignmentCache.create(cctx.kernalContext(), affFunc, ccfg), initAff);
         }
 
         /** {@inheritDoc} */
@@ -2829,43 +2799,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
     }
 
-    /** */
-    private CacheGroupNoAffOrFilteredHolder createHolder(
-        GridCacheSharedContext cctx,
-        CacheGroupDescriptor grpDesc,
-        AffinityTopologyVersion topVer,
-        @Nullable GridAffinityAssignmentCache initAff
-    ) throws IgniteCheckedException {
-        assert grpDesc != null;
-        assert !cctx.kernalContext().clientNode() || !CU.affinityNode(cctx.localNode(), grpDesc.config().getNodeFilter());
-
-        CacheConfiguration<?, ?> ccfg = grpDesc.config();
-
-        assert ccfg != null : grpDesc;
-        assert ccfg.getCacheMode() != LOCAL : ccfg.getName();
-
-        assert !cctx.discovery().cacheGroupAffinityNodes(grpDesc.groupId(),
-            topVer).contains(cctx.localNode()) : grpDesc.cacheOrGroupName();
-
-        AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity());
-
-        cctx.kernalContext().resource().injectGeneric(affFunc);
-        cctx.kernalContext().resource().injectCacheName(affFunc, ccfg.getName());
-
-        U.startLifecycleAware(F.asList(affFunc));
-
-        GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
-            grpDesc.cacheOrGroupName(),
-            grpDesc.groupId(),
-            affFunc,
-            ccfg.getNodeFilter(),
-            ccfg.getBackups(),
-            ccfg.getCacheMode() == LOCAL
-        );
-
-        return new CacheGroupNoAffOrFilteredHolder(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
-    }
-
     /**
      * Tracks rebalance state on coordinator.
      * After all partitions are rebalanced the current affinity is switched to ideal.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 3ff9ae5..6b7b06c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -758,14 +758,7 @@ public class CacheGroupContext {
      * @return Group name if it is specified, otherwise cache name.
      */
     public String cacheOrGroupName() {
-        return cacheOrGroupName(ccfg);
-    }
-
-    /**
-     * @return Group name if it is specified, otherwise cache name.
-     */
-    public static String cacheOrGroupName(CacheConfiguration<?, ?> ccfg) {
-        return ccfg.getGroupName() != null ? ccfg.getGroupName() : ccfg.getName();
+        return CU.cacheOrGroupName(ccfg);
     }
 
     /**
@@ -1060,17 +1053,7 @@ public class CacheGroupContext {
     public void start() throws IgniteCheckedException {
         GridAffinityAssignmentCache affCache = ctx.affinity().groupAffinity(grpId);
 
-        if (affCache != null)
-            aff = affCache;
-        else
-            aff = new GridAffinityAssignmentCache(ctx.kernalContext(),
-                cacheOrGroupName(),
-                grpId,
-                ccfg.getAffinity(),
-                ccfg.getNodeFilter(),
-                ccfg.getBackups(),
-                ccfg.getCacheMode() == LOCAL
-            );
+        aff = affCache == null ? GridAffinityAssignmentCache.create(ctx.kernalContext(), ccfg.getAffinity(), ccfg) : affCache;
 
         if (ccfg.getCacheMode() != LOCAL) {
             top = ctx.kernalContext().resource().resolve(new GridDhtPartitionTopologyImpl(ctx, this));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 235f69a..2841e02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -2024,7 +2024,7 @@ public class ClusterCachesInfo {
                     ", conflictingCacheName=" + desc.cacheName() + ']';
         }
 
-        int grpId = CU.cacheGroupId(cfg.getName(), cfg.getGroupName());
+        int grpId = CU.cacheGroupId(cfg);
 
         if (cfg.getGroupName() != null) {
             if (cacheGroupByName(cfg.getGroupName()) == null) {
@@ -2284,7 +2284,7 @@ public class ClusterCachesInfo {
             }
         }
 
-        int grpId = CU.cacheGroupId(startedCacheCfg.getName(), startedCacheCfg.getGroupName());
+        int grpId = CU.cacheGroupId(startedCacheCfg);
 
         Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java
index 6a5f6c6..3892fbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java
@@ -98,7 +98,7 @@ public class GridCacheContextInfo<K, V> {
      * @return Cache group name.
      */
     public String groupName() {
-        return CacheGroupContext.cacheOrGroupName(config);
+        return CU.cacheOrGroupName(config);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 8642cc7..efa9ccf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1110,6 +1110,22 @@ public class GridCacheUtils {
     }
 
     /**
+     * @param ccfg Cache configuration.
+     * @return Group ID.
+     */
+    public static int cacheGroupId(CacheConfiguration<?, ?> ccfg) {
+        return CU.cacheId(cacheOrGroupName(ccfg));
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @return Group name if it is specified, otherwise cache name.
+     */
+    public static String cacheOrGroupName(CacheConfiguration<?, ?> ccfg) {
+        return ccfg.getGroupName() == null ? ccfg.getName() : ccfg.getGroupName();
+    }
+
+    /**
      * Convert TTL to expire time.
      *
      * @param ttl TTL.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index f749bf4..7e2ff26 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -1182,7 +1182,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     public static String cacheDirName(CacheConfiguration<?, ?> ccfg) {
         boolean isSharedGrp = ccfg.getGroupName() != null;
 
-        return cacheDirName(isSharedGrp, isSharedGrp ? ccfg.getGroupName() : ccfg.getName());
+        return cacheDirName(isSharedGrp, CU.cacheOrGroupName(ccfg));
     }
 
     /**

[ignite] 02/02: IGNITE-14744 Restore snapshot taken on different topologies (#9539)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch ignite-2.12
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 0a7ccfa12dee96f4eee11fa2a78399fdde8890a8
Author: Maxim Muzafarov <mm...@apache.org>
AuthorDate: Sat Nov 20 16:14:25 2021 +0300

    IGNITE-14744 Restore snapshot taken on different topologies (#9539)
---
 .../communication/AbstractTransmission.java        |   4 +-
 .../managers/communication/FileReceiver.java       |   3 +-
 .../managers/communication/GridIoManager.java      |  28 +-
 .../communication/GridIoMessageFactory.java        |   4 +
 .../communication/TransmissionHandler.java         |   7 +-
 .../processors/cache/CacheGroupContext.java        |   2 +-
 .../processors/cache/ClusterCachesInfo.java        |   4 +-
 .../internal/processors/cache/ExchangeActions.java |   4 +-
 .../persistence/file/FilePageStoreManager.java     |  30 +-
 .../snapshot/AbstractSnapshotFutureTask.java       | 136 +++
 .../snapshot/AbstractSnapshotMessage.java          | 108 +++
 .../snapshot/IgniteSnapshotManager.java            | 872 +++++++++++++++++--
 .../snapshot/IgniteSnapshotVerifyException.java    |   3 +
 .../snapshot/SnapshotFilesFailureMessage.java      | 134 +++
 .../snapshot/SnapshotFilesRequestMessage.java      | 174 ++++
 ...eption.java => SnapshotFinishedFutureTask.java} |  38 +-
 .../persistence/snapshot/SnapshotFutureTask.java   | 117 +--
 .../persistence/snapshot/SnapshotMetadata.java     |  57 +-
 .../snapshot/SnapshotPartitionsVerifyHandler.java  |   9 +-
 .../snapshot/SnapshotResponseRemoteFutureTask.java | 172 ++++
 .../snapshot/SnapshotRestoreProcess.java           | 962 +++++++++++++++------
 .../util/distributed/DistributedProcess.java       |   5 +
 .../apache/ignite/internal/util/lang/GridFunc.java |   2 +-
 .../snapshot/AbstractSnapshotSelfTest.java         |  90 +-
 .../snapshot/IgniteClusterSnapshotCheckTest.java   |   1 +
 .../IgniteClusterSnapshotRestoreBaseTest.java      |  36 +
 .../IgniteClusterSnapshotRestoreSelfTest.java      |  77 +-
 .../snapshot/IgniteSnapshotManagerSelfTest.java    |   5 +-
 .../snapshot/IgniteSnapshotRemoteRequestTest.java  | 336 +++++++
 .../IgniteSnapshotRestoreFromRemoteTest.java       | 376 ++++++++
 .../IgniteBasicWithPersistenceTestSuite.java       |   4 +
 ...niteClusterSnapshotRestoreWithIndexingTest.java |  48 +-
 32 files changed, 3303 insertions(+), 545 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
index bd1da54..aed45f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/AbstractTransmission.java
@@ -57,9 +57,9 @@ abstract class AbstractTransmission implements Closeable {
         int chunkSize
     ) {
         A.notNull(meta, "Initial file meta cannot be null");
-        A.notNullOrEmpty(meta.name(), "Trasmisson name cannot be empty or null");
+        A.notNullOrEmpty(meta.name(), "Transmission name cannot be empty or null");
         A.ensure(meta.offset() >= 0, "File start position cannot be negative");
-        A.ensure(meta.count() > 0, "Total number of bytes to transfer must be greater than zero");
+        A.ensure(meta.count() >= 0, "Total number of bytes to transfer can't be less than zero");
         A.notNull(stopChecker, "Process stop checker cannot be null");
         A.ensure(chunkSize > 0, "Size of chunks to transfer data must be positive");
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
index 6af3ca4..c826e4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/FileReceiver.java
@@ -82,7 +82,8 @@ class FileReceiver extends TransmissionReceiver {
             fileIo.position(meta.offset());
         }
         catch (IOException e) {
-            throw new IgniteException("Unable to open destination file. Receiver will will be stopped", e);
+            throw new IgniteException("Unable to open destination file. Receiver will be stopped: " +
+                file.getAbsolutePath(), e);
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6b4395b..3476192 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -970,7 +970,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                                     if (nodeId.equals(e.getValue().rmtNodeId)) {
                                         it.remove();
 
-                                        interruptRecevier(e.getValue(),
+                                        interruptReceiver(e.getValue(),
                                             new ClusterTopologyCheckedException("Remote node left the grid. " +
                                                 "Receiver has been stopped : " + nodeId));
                                     }
@@ -1179,7 +1179,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
 
             for (ReceiverContext rctx : rcvs) {
-                interruptRecevier(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+                interruptReceiver(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
                     + ctx.localNodeId()));
             }
         }
@@ -1965,7 +1965,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             rcvCtx0 = rcvCtxs.remove(topic);
         }
 
-        interruptRecevier(rcvCtx0,
+        interruptReceiver(rcvCtx0,
             new IgniteCheckedException("Receiver has been closed due to removing corresponding transmission handler " +
                 "on local node [nodeId=" + ctx.localNodeId() + ']'));
     }
@@ -2787,7 +2787,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param rctx Receiver context to use.
      * @param ex Exception to close receiver with.
      */
-    private void interruptRecevier(ReceiverContext rctx, Exception ex) {
+    private void interruptReceiver(ReceiverContext rctx, Exception ex) {
         if (rctx == null)
             return;
 
@@ -2800,9 +2800,14 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             rctx.lastState = rctx.lastState == null ?
                 new TransmissionMeta(ex) : rctx.lastState.error(ex);
 
-            rctx.hnd.onException(rctx.rmtNodeId, ex);
+            if (X.hasCause(ex, TransmissionCancelledException.class)) {
+                if (log.isInfoEnabled())
+                    log.info("Transmission receiver has been cancelled [rctx=" + rctx + ']');
+            }
+            else
+                U.error(log, "Receiver has been interrupted due to an exception occurred [rctx=" + rctx + ']', ex);
 
-            U.error(log, "Receiver has been interrupted due to an exception occurred [ctx=" + rctx + ']', ex);
+            rctx.hnd.onException(rctx.rmtNodeId, ex);
         }
     }
 
@@ -2856,7 +2861,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     "It's not allowed to process different sessions over the same topic simultaneously. " +
                     "Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ", nodeId=" + rmtNodeId + ']');
 
-                U.error(log, err);
+                U.error(log, "Error has been sent back to remote node. Receiver holds the local topic " +
+                    "[topic=" + topic + ", rmtNodeId=" + rmtNodeId + ", ctx=" + rcvCtx + ']', err);
 
                 out.writeObject(new TransmissionMeta(err));
 
@@ -2881,17 +2887,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
                     receiveFromChannel(topic, rcvCtx, in, out, ch);
                 else
-                    interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
+                    interruptReceiver(rcvCtxs.remove(topic), rcvCtx.lastState.error());
             }
             finally {
                 rcvCtx.lock.unlock();
             }
         }
         catch (Throwable t) {
-            U.error(log, "Download session cannot be finished due to an unexpected error [ctx=" + rcvCtx + ']', t);
-
             // Do not remove receiver context here, since sender will recconect to get this error.
-            interruptRecevier(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
+            interruptReceiver(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
         }
         finally {
             U.closeQuiet(in);
@@ -2991,7 +2995,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 }
 
                 @Override public void onTimeout() {
-                    interruptRecevier(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
+                    interruptReceiver(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
                         "waiting for the reconnect has been timeouted"));
                 }
             });
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index d39f7c3..567cf7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -136,6 +136,8 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessage;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessage;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -371,6 +373,8 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
         factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
         factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
         factory.register((short)177, TcpInverseConnectionResponseMessage::new);
+        factory.register(SnapshotFilesRequestMessage.TYPE_CODE, SnapshotFilesRequestMessage::new);
+        factory.register(SnapshotFilesFailureMessage.TYPE_CODE, SnapshotFilesFailureMessage::new);
 
         // [-3..119] [124..129] [-23..-28] [-36..-55] - this
         // [120..123] - DR
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
index da0dd69..a3c9f2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/TransmissionHandler.java
@@ -74,13 +74,16 @@ public interface TransmissionHandler {
      *
      * @param nodeId Remote node id from which request has been received.
      * @param initMeta Initial handler meta info.
-     * @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
+     * @return Instance of read handler to process incoming data like the {@link FileChannel} manner.
      */
     public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);
 
     /**
+     * The {@link TransmissionCancelledException} will be received by exception handler if the local transmission
+     * ends by the user interruption request.
+     *
      * @param nodeId Remote node id on which the error occurred.
-     * @param err The err of fail handling process.
+     * @param err The error of fail handling process.
      */
     public void onException(UUID nodeId, Throwable err);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 6b7b06c..7d6eec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -639,7 +639,7 @@ public class CacheGroupContext {
     /**
      * @return Cache shared context.
      */
-    public GridCacheSharedContext shared() {
+    public GridCacheSharedContext<?, ?> shared() {
         return ctx;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 2841e02..b8bd968 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -995,6 +995,8 @@ public class ClusterCachesInfo {
         DynamicCacheChangeRequest req,
         String cacheName
     ) {
+        assert exchangeActions != null;
+
         CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
 
         IgniteCheckedException err = null;
@@ -1038,7 +1040,7 @@ public class ClusterCachesInfo {
         if (err == null && req.restartId() == null) {
             IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
 
-            if (snapshotMgr.isRestoring(cacheName, ccfg.getGroupName())) {
+            if (snapshotMgr.isRestoring(ccfg)) {
                 err = new IgniteCheckedException("Cache start failed. A cache or group with the same name is " +
                     "currently being restored from a snapshot [cache=" + cacheName +
                     (ccfg.getGroupName() == null ? "" : ", group=" + ccfg.getGroupName()) + ']');
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index 8736a88..3b93244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -446,10 +446,10 @@ public class ExchangeActions {
      *
      */
     public static class CacheGroupActionData {
-        /** */
+        /** Cache group descriptor. */
         private final CacheGroupDescriptor desc;
 
-        /** */
+        /** Destroy flag. */
         private final boolean destroy;
 
         /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 7e2ff26..4601ec0 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -134,6 +134,12 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
 
     /** */
+    public static final Predicate<File> DATA_DIR_FILTER = dir ->
+        dir.getName().startsWith(CACHE_DIR_PREFIX) ||
+        dir.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
+        dir.getName().equals(MetaStorage.METASTORAGE_DIR_NAME);
+
+    /** */
     public static final String CACHE_DATA_FILENAME = "cache_data.dat";
 
     /** */
@@ -1015,16 +1021,34 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
         if (files == null)
             return Collections.emptyList();
 
-        return Arrays.stream(dir.listFiles())
+        return Arrays.stream(files)
             .sorted()
             .filter(File::isDirectory)
-            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
-                f.getName().equals(MetaStorage.METASTORAGE_DIR_NAME))
+            .filter(DATA_DIR_FILTER)
             .filter(f -> names.test(cacheGroupName(f)))
             .collect(Collectors.toList());
     }
 
     /**
+     * @param dir Directory to check.
+     * @param grpId Cache group id
+     * @return Files that match cache or cache group pattern.
+     */
+    public static File cacheDirectory(File dir, int grpId) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return null;
+
+        return Arrays.stream(files)
+            .filter(File::isDirectory)
+            .filter(DATA_DIR_FILTER)
+            .filter(f -> CU.cacheId(cacheGroupName(f)) == grpId)
+            .findAny()
+            .orElse(null);
+    }
+
+    /**
      * @param partFileName Partition file name.
      * @return Partition id.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
new file mode 100644
index 0000000..b4c65af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * @param <T> Type of snapshot processing result.
+ */
+abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
+    /** Shared context. */
+    protected final GridCacheSharedContext<?, ?> cctx;
+
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Node id which cause snapshot operation. */
+    protected final UUID srcNodeId;
+
+    /** Unique identifier of snapshot process. */
+    protected final String snpName;
+
+    /** Snapshot working directory on file system. */
+    protected final File tmpSnpWorkDir;
+
+    /** IO factory which will be used for creating snapshot delta-writers. */
+    protected final FileIOFactory ioFactory;
+
+    /** Snapshot data sender. */
+    @GridToStringExclude
+    protected final SnapshotSender snpSndr;
+
+    /** Partition to be processed. */
+    protected final Map<Integer, Set<Integer>> parts;
+
+    /** An exception which has been occurred during snapshot processing. */
+    protected final AtomicReference<Throwable> err = new AtomicReference<>();
+
+    /**
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     * @param parts Partition to be processed.
+     */
+    protected AbstractSnapshotFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts
+    ) {
+        assert snpName != null : "Snapshot name cannot be empty or null.";
+        assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
+        assert snpSndr.executor() != null : "Executor service must be not null.";
+
+        this.cctx = cctx;
+        this.log = cctx.logger(AbstractSnapshotFutureTask.class);
+        this.srcNodeId = srcNodeId;
+        this.snpName = snpName;
+        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+        this.ioFactory = ioFactory;
+        this.snpSndr = snpSndr;
+        this.parts = parts;
+    }
+
+    /**
+     * @return Snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /**
+     * @return Node id which triggers this operation.
+     */
+    public UUID sourceNodeId() {
+        return srcNodeId;
+    }
+
+    /**
+     * Initiates snapshot task.
+     *
+     * @return {@code true} if task started by this call.
+     */
+    public abstract boolean start();
+
+    /**
+     * @param th An exception which occurred during snapshot processing.
+     */
+    public abstract void acceptException(Throwable th);
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel() {
+        // Cancellation of snapshot future should not throw an exception.
+        acceptException(new IgniteFutureCancelledCheckedException("Snapshot operation has been cancelled " +
+            "by external process [snpName=" + snpName + ']'));
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AbstractSnapshotFutureTask.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
new file mode 100644
index 0000000..e76ca16
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+abstract class AbstractSnapshotMessage implements Message {
+    /** Unique request id. */
+    private String reqId;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    protected AbstractSnapshotMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Unique request id.
+     */
+    protected AbstractSnapshotMessage(String reqId) {
+        assert U.alphanumericUnderscore(reqId) : reqId;
+
+        this.reqId = reqId;
+    }
+
+    /**
+     * @return Unique request id.
+     */
+    public String requestId() {
+        return reqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        if (writer.state() == 0) {
+            if (!writer.writeString("reqId", reqId))
+                return false;
+
+            writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (reader.state() == 0) {
+            reqId = reader.readString("reqId");
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        return reader.afterMessageRead(AbstractSnapshotMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AbstractSnapshotMessage.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 7fb70ca..7207d57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -36,6 +36,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -50,26 +51,40 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.SnapshotEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFeatures;
@@ -78,6 +93,12 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -117,6 +138,7 @@ import org.apache.ignite.internal.util.GridBusyLock;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.distributed.DistributedProcess;
 import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
@@ -153,6 +175,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
 import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
 import static org.apache.ignite.internal.MarshallerContextImpl.mappingFileStoreWorkDir;
 import static org.apache.ignite.internal.MarshallerContextImpl.resolveMappingFileStoreWorkDir;
 import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings;
@@ -171,6 +194,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectories;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
 import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.DB_DEFAULT_FOLDER;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
@@ -209,6 +233,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Text Reason for checkpoint to start snapshot operation. */
     public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s";
 
+    /** Name prefix for each remote snapshot operation. */
+    public static final String RMT_SNAPSHOT_PREFIX = "snapshot_";
+
     /** Default snapshot directory for loading remote snapshots. */
     public static final String DFLT_SNAPSHOT_TMP_DIR = "snp";
 
@@ -216,8 +243,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress.";
 
     /** Error message to finalize snapshot tasks. */
-    public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " +
-        "is stopping";
+    public static final String SNP_NODE_STOPPING_ERR_MSG = "The operation is cancelled due to the local node is stopping";
 
     /** Metastorage key to save currently running snapshot. */
     public static final String SNP_RUNNING_KEY = "snapshot-running";
@@ -240,6 +266,27 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Total number of thread to perform local snapshot. */
     private static final int SNAPSHOT_THREAD_POOL_SIZE = 4;
 
+    /** Default snapshot topic to receive snapshots from remote node. */
+    private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp");
+
+    /** File transmission parameter of cache group id. */
+    private static final String SNP_GRP_ID_PARAM = "grpId";
+
+    /** File transmission parameter of cache partition id. */
+    private static final String SNP_PART_ID_PARAM = "partId";
+
+    /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */
+    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+    /** File transmission parameter of a cache directory with is currently sends its partitions. */
+    private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+    /** Snapshot parameter name for a file transmission. */
+    private static final String RQ_ID_NAME_PARAM = "rqId";
+
+    /** Total snapshot files count which receiver should expect to receive. */
+    private static final String SNP_PARTITIONS_CNT = "partsCnt";
+
     /**
      * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
      * It is important to have only one buffer per thread (instead of creating each buffer per
@@ -249,7 +296,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     private final ThreadLocal<ByteBuffer> locBuff;
 
     /** Map of registered cache snapshot processes and their corresponding contexts. */
-    private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, AbstractSnapshotFutureTask<?>> locSnpTasks = new ConcurrentHashMap<>();
 
     /** Lock to protect the resources is used. */
     private final GridBusyLock busyLock = new GridBusyLock();
@@ -278,6 +325,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Local snapshot sender factory. */
     private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
 
+    /** Remote snapshot sender factory. */
+    private BiFunction<String, UUID, SnapshotSender> rmtSndrFactory = this::remoteSnapshotSenderFactory;
+
     /** Main snapshot directory to save created snapshots. */
     private volatile File locSnpDir;
 
@@ -314,6 +364,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Snapshot operation handlers. */
     private final SnapshotHandlers handlers = new SnapshotHandlers();
 
+    /** Manager to receive responses of remote snapshot requests. */
+    private final SequentialRemoteSnapshotManager snpRmtMgr;
+
     /**
      * @param ctx Kernal context.
      */
@@ -331,6 +384,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
 
         restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
+
+        // Manage remote snapshots.
+        snpRmtMgr = new SequentialRemoteSnapshotManager();
     }
 
     /**
@@ -405,6 +461,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 "the configured via IgniteConfiguration snapshot working path.");
 
         cctx.exchange().registerExchangeAwareComponent(this);
+
         ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
 
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> {
@@ -427,19 +484,23 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                         endSnpProc.start(snpReq.requestId(), snpReq);
                     }
 
-                    for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+                    for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values()) {
                         if (sctx.sourceNodeId().equals(leftNodeId) ||
                             (reqNodeLeft && snpReq.snapshotName().equals(sctx.snapshotName())))
                             sctx.acceptException(new ClusterTopologyCheckedException(err));
                     }
 
                     restoreCacheGrpProc.onNodeLeft(leftNodeId);
+                    snpRmtMgr.onNodeLeft(leftNodeId);
                 }
             }
             finally {
                 busyLock.leaveBusy();
             }
         }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, snpRmtMgr);
+        cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, snpRmtMgr);
     }
 
     /** {@inheritDoc} */
@@ -450,11 +511,13 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             restoreCacheGrpProc.interrupt(new NodeStoppingException("Node is stopping."));
 
             // Try stop all snapshot processing if not yet.
-            for (SnapshotFutureTask sctx : locSnpTasks.values())
+            for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values())
                 sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
 
             locSnpTasks.clear();
 
+            snpRmtMgr.stop();
+
             synchronized (snpOpMux) {
                 if (clusterSnpFut != null) {
                     clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG));
@@ -466,6 +529,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             if (snpRunner != null)
                 snpRunner.shutdownNow();
 
+            cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC);
+            cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC);
+
             if (discoLsnr != null)
                 cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr);
 
@@ -639,7 +705,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             parts.put(grpId, null);
         }
 
-        IgniteInternalFuture<Set<GroupPartitionId>> task0;
+        IgniteInternalFuture<?> task0;
 
         if (parts.isEmpty() && !withMetaStorage)
             task0 = new GridFinishedFuture<>(Collections.emptySet());
@@ -650,7 +716,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 withMetaStorage,
                 locSndrFactory.apply(req.snapshotName()));
 
-            if (withMetaStorage) {
+            if (withMetaStorage && task0 instanceof SnapshotFutureTask) {
                 ((DistributedMetaStorageImpl)cctx.kernalContext().distributedMetastorage())
                     .suspend(((SnapshotFutureTask)task0).started());
             }
@@ -681,7 +747,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                     cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
                     grpIds,
                     blts,
-                    fut.result());
+                    (Set<GroupPartitionId>)fut.result());
 
                 try (OutputStream out = new BufferedOutputStream(new FileOutputStream(smf))) {
                     U.marshal(marsh, meta, out);
@@ -903,12 +969,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /**
      * Check if the cache or group with the specified name is currently being restored from the snapshot.
      *
-     * @param cacheName Cache name.
-     * @param grpName Cache group name.
+     * @param ccfg Cache configuration.
      * @return {@code True} if the cache or group with the specified name is being restored.
      */
-    public boolean isRestoring(String cacheName, @Nullable String grpName) {
-        return restoreCacheGrpProc.isRestoring(cacheName, grpName);
+    public boolean isRestoring(CacheConfiguration<?, ?> ccfg) {
+        return restoreCacheGrpProc.isRestoring(ccfg);
     }
 
     /**
@@ -980,7 +1045,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         busyLock.enterBusy();
 
         try {
-            for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+            for (AbstractSnapshotFutureTask<?> sctx : locSnpTasks.values()) {
                 if (sctx.snapshotName().equals(name))
                     sctx.cancel();
             }
@@ -1415,7 +1480,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
         SnapshotOperationRequest snpReq = clusterSnpReq;
 
-        SnapshotFutureTask task = locSnpTasks.get(snpReq.snapshotName());
+        AbstractSnapshotFutureTask<?> task = locSnpTasks.get(snpReq.snapshotName());
 
         if (task == null)
             return;
@@ -1427,7 +1492,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             try {
                 long start = U.currentTimeMillis();
 
-                task.started().get();
+                ((SnapshotFutureTask)task).started().get();
 
                 if (log.isInfoEnabled()) {
                     log.info("Finished waiting for a synchronized checkpoint under topology lock " +
@@ -1441,12 +1506,45 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     }
 
     /**
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param rmtNodeId The remote node to connect to.
+     * @param partHnd Received partition handler.
+     */
+    public IgniteInternalFuture<Void> requestRemoteSnapshotFiles(
+        UUID rmtNodeId,
+        String snpName,
+        Map<Integer, Set<Integer>> parts,
+        BooleanSupplier stopChecker,
+        BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+    ) throws IgniteCheckedException {
+        assert U.alphanumericUnderscore(snpName) : snpName;
+        assert partHnd != null;
+
+        ClusterNode rmtNode = cctx.discovery().node(rmtNodeId);
+
+        if (rmtNode == null) {
+            throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+        }
+
+        if (!nodeSupports(rmtNode, PERSISTENCE_CACHE_SNAPSHOT))
+            throw new IgniteCheckedException("Snapshot on remote node is not supported: " + rmtNode.id());
+
+        RemoteSnapshotFilesRecevier fut = new RemoteSnapshotFilesRecevier(this, rmtNodeId, snpName, parts, stopChecker, partHnd);
+
+        snpRmtMgr.submit(fut);
+
+        return fut;
+    }
+
+    /**
      * @param grps List of cache groups which will be destroyed.
      */
     public void onCacheGroupsStopped(List<Integer> grps) {
-        for (SnapshotFutureTask sctx : locSnpTasks.values()) {
+        for (AbstractSnapshotFutureTask<?> sctx : F.view(locSnpTasks.values(), t -> t instanceof SnapshotFutureTask)) {
             Set<Integer> retain = new HashSet<>(grps);
-            retain.retainAll(sctx.affectedCacheGroups());
+
+            retain.retainAll(((SnapshotFutureTask)sctx).affectedCacheGroups());
 
             if (!retain.isEmpty()) {
                 sctx.acceptException(new IgniteCheckedException("Snapshot has been interrupted due to some of the required " +
@@ -1572,59 +1670,63 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
      * @param snpSndr Factory which produces snapshot receiver instance.
      * @return Snapshot operation task which should be registered on checkpoint to run.
      */
-    SnapshotFutureTask registerSnapshotTask(
+    AbstractSnapshotFutureTask<?> registerSnapshotTask(
         String snpName,
         UUID srcNodeId,
         Map<Integer, Set<Integer>> parts,
         boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
-        if (!busyLock.enterBusy()) {
-            return new SnapshotFutureTask(
-                new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
-        }
-
-        try {
-            if (locSnpTasks.containsKey(snpName))
-                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+        AbstractSnapshotFutureTask<?> task = registerTask(snpName, new SnapshotFutureTask(cctx, srcNodeId, snpName,
+            tmpWorkDir, ioFactory, snpSndr, parts, withMetaStorage, locBuff));
 
-            SnapshotFutureTask snpFutTask;
+        if (!withMetaStorage) {
+            for (Integer grpId : parts.keySet()) {
+                if (!cctx.cache().isEncrypted(grpId))
+                    continue;
 
-            SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
-                snpFutTask = new SnapshotFutureTask(cctx,
-                    srcNodeId,
-                    snpName,
-                    tmpWorkDir,
-                    ioFactory,
-                    snpSndr,
-                    parts,
-                    withMetaStorage,
-                    locBuff));
+                task.onDone(new IgniteCheckedException("Snapshot contains encrypted cache group " + grpId +
+                    " but doesn't include metastore. Metastore is required because it contains encryption keys " +
+                    "required to start with encrypted caches contained in the snapshot."));
 
-            if (prev != null)
-                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+                return task;
+            }
+        }
 
-            if (!withMetaStorage) {
-                for (Integer grpId : parts.keySet()) {
-                    if (!cctx.cache().isEncrypted(grpId))
-                        continue;
+        return task;
+    }
 
-                    snpFutTask.onDone(new IgniteCheckedException("Snapshot contains encrypted cache group " + grpId +
-                        " but doesn't include metastore. Metastore is required because it contains encryption keys " +
-                        "required to start with encrypted caches contained in the snapshot."));
+    /**
+     * @param task Snapshot operation task to be executed.
+     * @return Snapshot operation task which should be registered on checkpoint to run.
+     */
+    private AbstractSnapshotFutureTask<?> registerTask(String rqId, AbstractSnapshotFutureTask<?> task) {
+        if (!busyLock.enterBusy()) {
+            return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+                cctx.localNodeId() + ']'));
+        }
 
-                    return snpFutTask;
-                }
+        try {
+            if (locSnpTasks.containsKey(rqId)) {
+                return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+                    rqId));
             }
 
+            AbstractSnapshotFutureTask<?> prev = locSnpTasks.putIfAbsent(rqId, task);
+
+            if (prev != null)
+                return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+                    rqId));
+
             if (log.isInfoEnabled()) {
                 log.info("Snapshot task has been registered on local node [sctx=" + this +
+                    ", task=" + task.getClass().getSimpleName() +
                     ", topVer=" + cctx.discovery().topologyVersionEx() + ']');
             }
 
-            snpFutTask.listen(f -> locSnpTasks.remove(snpName));
+            task.listen(f -> locSnpTasks.remove(rqId));
 
-            return snpFutTask;
+            return task;
         }
         finally {
             busyLock.leaveBusy();
@@ -1645,6 +1747,26 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         return locSndrFactory;
     }
 
+    /**
+     * @param factory Factory which produces {@link RemoteSnapshotSender} implementation.
+     */
+    void remoteSnapshotSenderFactory(BiFunction<String, UUID, SnapshotSender> factory) {
+        rmtSndrFactory = factory;
+    }
+
+    /**
+     * @param rqId Request id.
+     * @param nodeId Node id.
+     * @return Snapshot sender related to given node id.
+     */
+    RemoteSnapshotSender remoteSnapshotSenderFactory(String rqId, UUID nodeId) {
+        return new RemoteSnapshotSender(log,
+            snpRunner,
+            databaseRelativePath(pdsSettings.folderName()),
+            cctx.gridIO().openTransmissionSender(nodeId, DFLT_INITIAL_SNAPSHOT_TOPIC),
+            rqId);
+    }
+
     /** Snapshot finished successfully or already restored. Key can be removed. */
     private void removeLastMetaStorageKey() throws IgniteCheckedException {
         cctx.database().checkpointReadLock();
@@ -1700,6 +1822,18 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     }
 
     /**
+     * @param nodeId Remote node id on which requests has been registered.
+     * @return Snapshot future related to given node id.
+     */
+    AbstractSnapshotFutureTask<?> lastScheduledSnapshotResponseRemoteTask(UUID nodeId) {
+        return locSnpTasks.values().stream()
+            .filter(t -> t instanceof SnapshotResponseRemoteFutureTask)
+            .filter(t -> t.sourceNodeId().equals(nodeId))
+            .findFirst()
+            .orElse(null);
+    }
+
+    /**
      * @return Relative configured path of persistence data storage directory for the local node.
      * Example: {@code snapshotWorkDir/db/IgniteNodeName0}
      */
@@ -1883,6 +2017,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 return new SnapshotHandlerResult<>(hnd.invoke(ctx), null, ctx.localNode());
             }
             catch (Exception e) {
+                U.error(null, "Error invoking snapshot handler", e);
+
                 return new SnapshotHandlerResult<>(null, e, ctx.localNode());
             }
         }
@@ -2115,6 +2251,642 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            if (stopping) {
+                next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+                if (active != null)
+                    active.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+                RemoteSnapshotFilesRecevier r;
+
+                while ((r = queue.poll()) != null)
+                    r.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+
+                return;
+            }
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>(queue);
+
+            RemoteSnapshotFilesRecevier active0 = active;
+
+            if (active0 != null)
+                futs.add(active0);
+
+            return futs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotFilesRequestMessage) {
+                    SnapshotFilesRequestMessage reqMsg0 = (SnapshotFilesRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    try {
+                        synchronized (this) {
+                            AbstractSnapshotFutureTask<?> task = lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                            if (task != null) {
+                                // Task will also be removed from local map due to the listener on future done.
+                                task.cancel();
+
+                                log.info("Snapshot request has been cancelled due to another request received " +
+                                    "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                            }
+                        }
+
+                        AbstractSnapshotFutureTask<?> task = registerTask(rqId,
+                            new SnapshotResponseRemoteFutureTask(cctx,
+                                nodeId,
+                                snpName,
+                                tmpWorkDir,
+                                ioFactory,
+                                rmtSndrFactory.apply(rqId, nodeId),
+                                reqMsg0.parts()));
+
+                        task.listen(f -> {
+                            if (f.error() == null)
+                                return;
+
+                            U.error(log, "Failed to process request of creating a snapshot " +
+                                "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                            try {
+                                cctx.gridIO().sendToCustomTopic(nodeId,
+                                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                    new SnapshotFilesFailureMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                    SYSTEM_POOL);
+                            }
+                            catch (IgniteCheckedException ex0) {
+                                U.error(log, "Fail to send the response message with processing snapshot request " +
+                                    "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                            }
+                        });
+
+                        task.start();
+                    }
+                    catch (Throwable t) {
+                        U.error(log, "Error processing snapshot file request message " +
+                            "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', t);
+
+                        cctx.gridIO().sendToCustomTopic(nodeId,
+                            DFLT_INITIAL_SNAPSHOT_TOPIC,
+                            new SnapshotFilesFailureMessage(reqMsg0.requestId(), t.getMessage()),
+                            SYSTEM_POOL);
+                    }
+                }
+                else if (msg instanceof SnapshotFilesFailureMessage) {
+                    SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;
+
+                    RemoteSnapshotFilesRecevier task = active;
+
+                    if (task == null || !task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEnd(UUID nodeId) {
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null)
+                return;
+
+            assert task.partsLeft.get() == 0 : task;
+            assert task.rmtNodeId.equals(nodeId);
+
+            if (log.isInfoEnabled()) {
+                log.info("Requested snapshot from remote node has been fully received " +
+                    "[rqId=" + task.reqId + ", task=" + task + ']');
+            }
+
+            task.onDone((Void)null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onException(UUID nodeId, Throwable ex) {
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null)
+                return;
+
+            assert task.rmtNodeId.equals(nodeId);
+
+            task.acceptException(ex);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+            Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM);
+            String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM);
+            String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM);
+
+            String rqId = (String)fileMeta.params().get(RQ_ID_NAME_PARAM);
+            Integer partsCnt = (Integer)fileMeta.params().get(SNP_PARTITIONS_CNT);
+
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null || task.isDone() || !task.reqId.equals(rqId)) {
+                throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+                    "[rqId=" + rqId + ", meta=" + fileMeta + ", task=" + task + ']');
+            }
+
+            assert task.reqId.equals(rqId) && task.rmtNodeId.equals(nodeId) :
+                "Another transmission in progress [task=" + task + ", nodeId=" + rqId + ']';
+
+            busyLock.enterBusy();
+
+            try {
+                task.partsLeft.compareAndSet(-1, partsCnt);
+
+                File cacheDir = U.resolveWorkDirectory(task.dir.toString(),
+                    Paths.get(rmtDbNodePath, cacheDirName).toString(),
+                    false);
+
+                return Paths.get(cacheDir.getAbsolutePath(), getPartitionFileName(partId)).toString();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) {
+            throw new UnsupportedOperationException("Loading file by chunks is not supported: " + nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) {
+            Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM);
+            Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM);
+            String rqId = (String)initMeta.params().get(RQ_ID_NAME_PARAM);
+
+            assert grpId != null;
+            assert partId != null;
+            assert rqId != null;
+
+            RemoteSnapshotFilesRecevier task = active;
+
+            if (task == null || task.isDone() || !task.reqId.equals(rqId)) {
+                throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " +
+                    "[rqId=" + rqId + ", meta=" + initMeta + ", task=" + task + ']');
+            }
+
+            return new Consumer<File>() {
+                @Override public void accept(File file) {
+                    RemoteSnapshotFilesRecevier task0 = active;
+
+                    if (task0 == null || !task0.equals(task) || task0.isDone()) {
+                        throw new TransmissionCancelledException("Snapshot request is cancelled [rqId=" + rqId +
+                            ", grpId=" + grpId + ", partId=" + partId + ']');
+                    }
+
+                    busyLock.enterBusy();
+
+                    try {
+                        if (stopping)
+                            throw new IgniteException(SNP_NODE_STOPPING_ERR_MSG);
+
+                        task0.acceptFile(file);
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            };
+        }
+    }
+
+    /**
+     * Such an executor can executes tasks not in a single thread, but executes them
+     * on different threads sequentially. It's important for some {@link SnapshotSender}'s
+     * to process sub-task sequentially due to all these sub-tasks may share a single socket
+     * channel to send data to.
+     */
+    private static class SequentialExecutorWrapper implements Executor {
+        /** Ignite logger. */
+        private final IgniteLogger log;
+
+        /** Queue of task to execute. */
+        private final Queue<Runnable> tasks = new ArrayDeque<>();
+
+        /** Delegate executor. */
+        private final Executor executor;
+
+        /** Currently running task. */
+        private volatile Runnable active;
+
+        /** If wrapped executor is shutting down. */
+        private volatile boolean stopping;
+
+        /**
+         * @param executor Executor to run tasks on.
+         */
+        public SequentialExecutorWrapper(IgniteLogger log, Executor executor) {
+            this.log = log.getLogger(SequentialExecutorWrapper.class);
+            this.executor = executor;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void execute(final Runnable r) {
+            assert !stopping : "Task must be cancelled prior to the wrapped executor is shutting down.";
+
+            tasks.offer(() -> {
+                try {
+                    r.run();
+                }
+                finally {
+                    scheduleNext();
+                }
+            });
+
+            if (active == null)
+                scheduleNext();
+        }
+
+        /** */
+        private synchronized void scheduleNext() {
+            if ((active = tasks.poll()) != null) {
+                try {
+                    executor.execute(active);
+                }
+                catch (RejectedExecutionException e) {
+                    tasks.clear();
+
+                    stopping = true;
+
+                    log.warning("Task is outdated. Wrapped executor is shutting down.", e);
+                }
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class RemoteSnapshotSender extends SnapshotSender {
+        /** The sender which sends files to remote node. */
+        private final GridIoManager.TransmissionSender sndr;
+
+        /** Snapshot name. */
+        private final String rqId;
+
+        /** Local node persistent directory with consistent id. */
+        private final String relativeNodePath;
+
+        /** The number of cache partition files expected to be processed. */
+        private int partsCnt;
+
+        /**
+         * @param log Ignite logger.
+         * @param sndr File sender instance.
+         * @param rqId Snapshot name.
+         */
+        public RemoteSnapshotSender(
+            IgniteLogger log,
+            Executor exec,
+            String relativeNodePath,
+            GridIoManager.TransmissionSender sndr,
+            String rqId
+        ) {
+            super(log, new SequentialExecutorWrapper(log, exec));
+
+            this.sndr = sndr;
+            this.rqId = rqId;
+            this.relativeNodePath = relativeNodePath;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void init(int partsCnt) {
+            this.partsCnt = partsCnt;
+
+            if (F.isEmpty(relativeNodePath))
+                throw new IgniteException("Relative node path cannot be empty.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long len) {
+            try {
+                assert part.exists();
+                assert len > 0 : "Requested partitions has incorrect file length " +
+                    "[pair=" + pair + ", cacheDirName=" + cacheDirName + ']';
+
+                sndr.send(part, 0, len, transmissionParams(rqId, cacheDirName, pair), TransmissionPolicy.FILE);
+
+                if (log.isInfoEnabled()) {
+                    log.info("Partition file has been send [part=" + part.getName() + ", pair=" + pair +
+                        ", length=" + len + ']');
+                }
+            }
+            catch (TransmissionCancelledException e) {
+                if (log.isInfoEnabled()) {
+                    log.info("Transmission partition file has been interrupted [part=" + part.getName() +
+                        ", pair=" + pair + ']');
+                }
+            }
+            catch (IgniteCheckedException | InterruptedException | IOException e) {
+                U.error(log, "Error sending partition file [part=" + part.getName() + ", pair=" + pair +
+                    ", length=" + len + ']', e);
+
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+            throw new UnsupportedOperationException("Sending files by chunks of data is not supported: " + delta.getAbsolutePath());
+        }
+
+        /**
+         * @param cacheDirName Cache directory name.
+         * @param pair Cache group id with corresponding partition id.
+         * @return Map of params.
+         */
+        private Map<String, Serializable> transmissionParams(String rqId, String cacheDirName,
+            GroupPartitionId pair) {
+            Map<String, Serializable> params = new HashMap<>();
+
+            params.put(SNP_GRP_ID_PARAM, pair.getGroupId());
+            params.put(SNP_PART_ID_PARAM, pair.getPartitionId());
+            params.put(SNP_DB_NODE_PATH_PARAM, relativeNodePath);
+            params.put(SNP_CACHE_DIR_NAME_PARAM, cacheDirName);
+            params.put(RQ_ID_NAME_PARAM, rqId);
+            params.put(SNP_PARTITIONS_CNT, partsCnt);
+
+            return params;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close0(@Nullable Throwable th) {
+            U.closeQuiet(sndr);
+
+            if (th == null) {
+                if (log.isInfoEnabled())
+                    log.info("The remote snapshot sender closed normally [snpName=" + rqId + ']');
+            }
+            else {
+                U.warn(log, "The remote snapshot sender closed due to an error occurred while processing " +
+                    "snapshot operation [snpName=" + rqId + ']', th);
+            }
+        }
+    }
+
     /**
      * Snapshot sender which writes all data to local directory.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
index bcaea42..ac42ccd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * Compound snapshot verification exception from the nodes where the verification process executed.
@@ -36,6 +37,8 @@ public class IgniteSnapshotVerifyException extends IgniteException {
      * @param map Map of received exceptions.
      */
     public IgniteSnapshotVerifyException(Map<ClusterNode, ? extends Exception> map) {
+        super(F.first(map.values()));
+
         exs.putAll(map);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
new file mode 100644
index 0000000..3cb9056
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message indicating a failure occurred during processing snapshot files request.
+ */
+public class SnapshotFilesFailureMessage extends AbstractSnapshotMessage {
+    /** Snapshot response message type (value is {@code 179}). */
+    public static final short TYPE_CODE = 179;
+
+    /** Serialization version. */
+    private static final long serialVersionUID = 0L;
+
+    /** Exception message which is occurred during snapshot request processing. */
+    private String errMsg;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public SnapshotFilesFailureMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request id to which response related to.
+     * @param errMsg Response error message.
+     */
+    public SnapshotFilesFailureMessage(String reqId, String errMsg) {
+        super(reqId);
+
+        this.errMsg = errMsg;
+    }
+
+    /**
+     * @return Response error message.
+     */
+    public String errorMessage() {
+        return errMsg;
+    }
+
+    /**
+     * @param errMsg Response error message.
+     * @return {@code this} for chaining.
+     */
+    public SnapshotFilesFailureMessage errorMessage(String errMsg) {
+        this.errMsg = errMsg;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        if (writer.state() == 1) {
+            if (!writer.writeString("errMsg", errMsg))
+                return false;
+
+            writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        if (reader.state() == 1) {
+            errMsg = reader.readString("errMsg");
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        return reader.afterMessageRead(SnapshotFilesFailureMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotFilesFailureMessage.class, this, super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
new file mode 100644
index 0000000..65d1345
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class SnapshotFilesRequestMessage extends AbstractSnapshotMessage {
+    /** Snapshot request message type (value is {@code 178}). */
+    public static final short TYPE_CODE = 178;
+
+    /** Serialization version. */
+    private static final long serialVersionUID = 0L;
+
+    /** Snapshot name to request. */
+    private String snpName;
+
+    /** Map of cache group ids and corresponding set of its partition ids. */
+    @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+    private Map<Integer, int[]> parts;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public SnapshotFilesRequestMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Unique request id.
+     * @param snpName Snapshot name.
+     * @param parts Map of cache group ids and corresponding set of its partition ids to be snapshot.
+     */
+    public SnapshotFilesRequestMessage(String reqId, String snpName, Map<Integer, Set<Integer>> parts) {
+        super(reqId);
+
+        assert parts != null && !parts.isEmpty();
+
+        this.snpName = snpName;
+        this.parts = new HashMap<>();
+
+        for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+            this.parts.put(e.getKey(), U.toIntArray(e.getValue()));
+    }
+
+    /**
+     * @return The demanded cache group partitions per each cache group.
+     */
+    public Map<Integer, Set<Integer>> parts() {
+        Map<Integer, Set<Integer>> res = new HashMap<>();
+
+        for (Map.Entry<Integer, int[]> e : parts.entrySet()) {
+            res.put(e.getKey(), e.getValue().length == 0 ? null : Arrays.stream(e.getValue())
+                .boxed()
+                .collect(Collectors.toSet()));
+        }
+
+        return res;
+    }
+
+    /**
+     * @return Requested snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        if (writer.state() == 1) {
+            if (!writer.writeString("snpName", snpName))
+                return false;
+
+            writer.incrementState();
+        }
+
+        if (writer.state() == 2) {
+            if (!writer.writeMap("parts", parts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
+                return false;
+
+            writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        if (reader.state() == 1) {
+            snpName = reader.readString("snpName");
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        if (reader.state() == 2) {
+            parts = reader.readMap("parts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
+
+            if (!reader.isLastRead())
+                return false;
+
+            reader.incrementState();
+        }
+
+        return reader.afterMessageRead(SnapshotFilesRequestMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotFilesRequestMessage.class, this, super.toString());
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
similarity index 53%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
index bcaea42..561bb16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotVerifyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFinishedFutureTask.java
@@ -17,32 +17,26 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-
-/**
- * Compound snapshot verification exception from the nodes where the verification process executed.
- */
-public class IgniteSnapshotVerifyException extends IgniteException {
-    /** Serial version UID. */
-    private static final long serialVersionUID = 0L;
-
-    /** Map of received exceptions. */
-    private final Map<ClusterNode, Exception> exs = new HashMap<>();
+import org.apache.ignite.IgniteCheckedException;
 
+/** */
+public class SnapshotFinishedFutureTask extends AbstractSnapshotFutureTask<Void> {
     /**
-     * @param map Map of received exceptions.
+     * @param e Finished snapshot task future with particular exception.
      */
-    public IgniteSnapshotVerifyException(Map<ClusterNode, ? extends Exception> map) {
-        exs.putAll(map);
+    public SnapshotFinishedFutureTask(IgniteCheckedException e) {
+        super(null, null, null, null, null, null, null);
+
+        onDone(e);
     }
 
-    /**
-     * @return Map of received exceptions.
-     */
-    public Map<ClusterNode, Exception> exceptions() {
-        return exs;
+    /** {@inheritDoc} */
+    @Override public boolean start() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptException(Throwable th) {
+        onDone(th);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 2fefe35..97f7d72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -38,7 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -47,7 +46,6 @@ import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
@@ -87,33 +85,20 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaFile;
 
 /**
- *
+ * The requested map of cache groups and its partitions to include into snapshot represented as <tt>Map<Integer, Set<Integer>></tt>.
+ * If array of partitions is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
+ * In this case if all partitions have OWNING state the index partition also will be included.
+ * <p>
+ * If partitions for particular cache group are not provided that they will be collected and added
+ * on checkpoint under the write-lock.
  */
-class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implements CheckpointListener {
-    /** Shared context. */
-    private final GridCacheSharedContext<?, ?> cctx;
-
+class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId>> implements CheckpointListener {
     /** File page store manager for accessing cache group associated files. */
     private final FilePageStoreManager pageStore;
 
-    /** Ignite logger. */
-    private final IgniteLogger log;
-
-    /** Node id which cause snapshot operation. */
-    private final UUID srcNodeId;
-
-    /** Unique identifier of snapshot process. */
-    private final String snpName;
-
-    /** Snapshot working directory on file system. */
-    private final File tmpSnpWorkDir;
-
     /** Local buffer to perform copy-on-write operations for {@link PageStoreSerialWriter}. */
     private final ThreadLocal<ByteBuffer> locBuff;
 
-    /** IO factory which will be used for creating snapshot delta-writers. */
-    private final FileIOFactory ioFactory;
-
     /**
      * The length of file size per each cache partition file.
      * Partition has value greater than zero only for partitions in OWNING state.
@@ -135,20 +120,6 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
      */
     private final List<CacheConfigurationSender> ccfgSndrs = new CopyOnWriteArrayList<>();
 
-    /** Snapshot data sender. */
-    @GridToStringExclude
-    private final SnapshotSender snpSndr;
-
-    /**
-     * Requested map of cache groups and its partitions to include into snapshot. If array of partitions
-     * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
-     * In this case if all of partitions have OWNING state the index partition also will be included.
-     * <p>
-     * If partitions for particular cache group are not provided that they will be collected and added
-     * on checkpoint under the write lock.
-     */
-    private final Map<Integer, Set<Integer>> parts;
-
     /** {@code true} if all metastorage data must be also included into snapshot. */
     private final boolean withMetaStorage;
 
@@ -167,38 +138,16 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
     /** Future which will be completed when task requested to be closed. Will be executed on system pool. */
     private volatile CompletableFuture<Void> closeFut;
 
-    /** An exception which has been occurred during snapshot processing. */
-    private final AtomicReference<Throwable> err = new AtomicReference<>();
-
     /** Flag indicates that task already scheduled on checkpoint. */
     private final AtomicBoolean started = new AtomicBoolean();
 
     /**
-     * @param e Finished snapshot task future with particular exception.
-     */
-    public SnapshotFutureTask(IgniteCheckedException e) {
-        assert e != null : "Exception for a finished snapshot task must be not null";
-
-        cctx = null;
-        pageStore = null;
-        log = null;
-        snpName = null;
-        srcNodeId = null;
-        tmpSnpWorkDir = null;
-        snpSndr = null;
-
-        err.set(e);
-        startedFut.onDone(e);
-        onDone(e);
-        parts = null;
-        withMetaStorage = false;
-        ioFactory = null;
-        locBuff = null;
-    }
-
-    /**
-     * @param snpName Unique identifier of snapshot task.
-     * @param ioFactory Factory to working with delta as file storage.
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
      * @param parts Map of cache groups and its partitions to include into snapshot, if set of partitions
      * is {@code null} than all OWNING partitions for given cache groups will be included into snapshot.
      */
@@ -213,47 +162,20 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
         boolean withMetaStorage,
         ThreadLocal<ByteBuffer> locBuff
     ) {
+        super(cctx, srcNodeId, snpName, tmpWorkDir, ioFactory, snpSndr, parts);
+
         assert snpName != null : "Snapshot name cannot be empty or null.";
         assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
         assert snpSndr.executor() != null : "Executor service must be not null.";
         assert cctx.pageStore() instanceof FilePageStoreManager : "Snapshot task can work only with physical files.";
         assert !parts.containsKey(MetaStorage.METASTORAGE_CACHE_ID) : "The withMetaStorage must be used instead.";
 
-        this.parts = parts;
         this.withMetaStorage = withMetaStorage;
-        this.cctx = cctx;
         this.pageStore = (FilePageStoreManager)cctx.pageStore();
-        this.log = cctx.logger(SnapshotFutureTask.class);
-        this.snpName = snpName;
-        this.srcNodeId = srcNodeId;
-        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
-        this.snpSndr = snpSndr;
-        this.ioFactory = ioFactory;
         this.locBuff = locBuff;
     }
 
     /**
-     * @return Snapshot name.
-     */
-    public String snapshotName() {
-        return snpName;
-    }
-
-    /**
-     * @return Node id which triggers this operation.
-     */
-    public UUID sourceNodeId() {
-        return srcNodeId;
-    }
-
-    /**
-     * @return Type of snapshot operation.
-     */
-    public Class<? extends SnapshotSender> type() {
-        return snpSndr.getClass();
-    }
-
-    /**
      * @return Set of cache groups included into snapshot operation.
      */
     public Set<Integer> affectedCacheGroups() {
@@ -263,7 +185,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
     /**
      * @param th An exception which occurred during snapshot processing.
      */
-    public void acceptException(Throwable th) {
+    @Override public void acceptException(Throwable th) {
         if (th == null)
             return;
 
@@ -323,7 +245,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
      *
      * @return {@code true} if task started by this call.
      */
-    public boolean start() {
+    @Override public boolean start() {
         if (stopping())
             return false;
 
@@ -676,8 +598,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
 
     /** {@inheritDoc} */
     @Override public boolean cancel() {
-        acceptException(new IgniteFutureCancelledCheckedException("Snapshot operation has been cancelled " +
-            "by external process [snpName=" + snpName + ']'));
+        super.cancel();
 
         try {
             closeAsync().get();
@@ -725,7 +646,7 @@ class SnapshotFutureTask extends GridFutureAdapter<Set<GroupPartitionId>> implem
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(SnapshotFutureTask.class, this);
+        return S.toString(SnapshotFutureTask.class, this, super.toString());
     }
 
     /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index f5fb6cba..20cb5d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
+import java.io.IOException;
+import java.io.InvalidObjectException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,6 +31,7 @@ import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Snapshot metadata file.
@@ -67,7 +71,7 @@ public class SnapshotMetadata implements Serializable {
      * since for instance, due to the node filter there is no cache data on node.
      */
     @GridToStringInclude
-    private final Map<Integer, Set<Integer>> locParts = new HashMap<>();
+    private transient Map<Integer, Set<Integer>> locParts = new HashMap<>();
 
     /**
      * @param rqId Unique snapshot request id.
@@ -155,7 +159,56 @@ public class SnapshotMetadata implements Serializable {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return Collections.unmodifiableMap(locParts);
+    }
+
+    /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        // Write out any hidden serialization.
+        s.defaultWriteObject();
+
+        // Write out size of map.
+        s.writeInt(locParts.size());
+
+        // Write out all elements in the proper order.
+        for (Map.Entry<Integer, Set<Integer>> e : locParts.entrySet()) {
+            s.writeInt(e.getKey());
+            s.writeInt(e.getValue().size());
+
+            for (Integer partId : e.getValue())
+                s.writeInt(partId);
+        }
+    }
+
+    /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */
+    private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
+        // Read in any hidden serialization.
+        s.defaultReadObject();
+
+        // Read size and verify non-negative.
+        int size = s.readInt();
+
+        if (size < 0)
+            throw new InvalidObjectException("Illegal size: " + size);
+
+        locParts = U.newHashMap(size);
+
+        // Read in all elements in the proper order.
+        for (int i = 0; i < size; i++) {
+            int grpId = s.readInt();
+            int total = s.readInt();
+
+            if (total < 0)
+                throw new InvalidObjectException("Illegal size: " + total);
+
+            Set<Integer> parts = U.newHashSet(total);
+
+            for (int k = 0; k < total; k++)
+                parts.add(s.readInt());
+
+            locParts.put(grpId, parts);
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
index a227f66..6b4fba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -103,7 +104,8 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
             if (!grps.remove(grpId))
                 continue;
 
-            Set<Integer> parts = new HashSet<>(meta.partitions().get(grpId));
+            Set<Integer> parts = meta.partitions().get(grpId) == null ? Collections.emptySet() :
+                new HashSet<>(meta.partitions().get(grpId));
 
             for (File part : cachePartitionFiles(dir)) {
                 int partId = partId(part.getName());
@@ -213,6 +215,11 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part
                 }
             );
         }
+        catch (Throwable t) {
+            log.error("Error executing handler: ", t);
+
+            throw t;
+        }
         finally {
             for (GridComponent comp : snpCtx)
                 comp.stop(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
new file mode 100644
index 0000000..4894032
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirectory;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+
+/** */
+public class SnapshotResponseRemoteFutureTask extends AbstractSnapshotFutureTask<Void> {
+    /**
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     */
+    public SnapshotResponseRemoteFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts
+    ) {
+        super(cctx, srcNodeId, snpName, tmpWorkDir, ioFactory, snpSndr, parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean start() {
+        if (F.isEmpty(parts))
+            return false;
+
+        try {
+            List<GroupPartitionId> handled = new ArrayList<>();
+
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+                ofNullable(e.getValue()).orElse(Collections.emptySet())
+                    .forEach(p -> handled.add(new GroupPartitionId(e.getKey(), p)));
+            }
+
+            snpSndr.init(handled.size());
+
+            File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName);
+
+            List<CompletableFuture<Void>> futs = new ArrayList<>();
+            List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(snpName);
+
+            for (SnapshotMetadata meta : metas) {
+                Map<Integer, Set<Integer>> parts0 = meta.partitions();
+
+                if (F.isEmpty(parts0))
+                    continue;
+
+                handled.removeIf(gp -> {
+                    if (ofNullable(parts0.get(gp.getGroupId()))
+                        .orElse(Collections.emptySet())
+                        .contains(gp.getPartitionId())
+                    ) {
+                        futs.add(CompletableFuture.runAsync(() -> {
+                                if (err.get() != null)
+                                    return;
+
+                                File cacheDir = cacheDirectory(new File(snpDir, databaseRelativePath(meta.folderName())),
+                                    gp.getGroupId());
+
+                                if (cacheDir == null) {
+                                    throw new IgniteException("Cache directory not found [snpName=" + snpName + ", meta=" + meta +
+                                        ", pair=" + gp + ']');
+                                }
+
+                                File snpPart = getPartitionFile(cacheDir.getParentFile(), cacheDir.getName(), gp.getPartitionId());
+
+                                if (!snpPart.exists()) {
+                                    throw new IgniteException("Snapshot partition file not found [cacheDir=" + cacheDir +
+                                        ", pair=" + gp + ']');
+                                }
+
+                                snpSndr.sendPart(snpPart, cacheDir.getName(), gp, snpPart.length());
+                            },
+                            snpSndr.executor())
+                            .whenComplete((r, t) -> err.compareAndSet(null, t)));
+
+                        return true;
+                    }
+
+                    return false;
+                });
+            }
+
+            if (!handled.isEmpty()) {
+                err.compareAndSet(null, new IgniteException("Snapshot partitions missed on local node [snpName=" + snpName +
+                    ", missed=" + handled + ']'));
+            }
+
+            int size = futs.size();
+
+            CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
+                .whenComplete((r, t) -> {
+                    Throwable th = ofNullable(err.get()).orElse(t);
+
+                    if (th == null && log.isInfoEnabled()) {
+                        log.info("Snapshot partitions have been sent to the remote node [snpName=" + snpName +
+                            ", rmtNodeId=" + srcNodeId + ']');
+                    }
+
+                    close(th);
+                });
+
+            return true;
+        }
+        catch (Throwable t) {
+            if (err.compareAndSet(null, t))
+                close(t);
+
+            return false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptException(Throwable th) {
+        if (err.compareAndSet(null, th))
+            close(th);
+    }
+
+    /**
+     * @param th Additional close exception if occurred.
+     */
+    private void close(@Nullable Throwable th) {
+        if (th == null) {
+            snpSndr.close(null);
+            onDone((Void)null);
+        }
+        else {
+            snpSndr.close(th);
+            onDone(th);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 99a730a..c796299 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
@@ -26,30 +29,38 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiPredicate;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
+import java.util.function.IntFunction;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
@@ -62,16 +73,23 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Optional.ofNullable;
 import static org.apache.ignite.internal.IgniteFeatures.SNAPSHOT_RESTORE_CACHE_GROUP;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.databaseRelativePath;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_ROLLBACK;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
@@ -96,7 +114,10 @@ public class SnapshotRestoreProcess {
     private final GridKernalContext ctx;
 
     /** Cache group restore prepare phase. */
-    private final DistributedProcess<SnapshotOperationRequest, ArrayList<StoredCacheData>> prepareRestoreProc;
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotRestoreOperationResponse> prepareRestoreProc;
+
+    /** Cache group restore preload partitions phase. */
+    private final DistributedProcess<UUID, Boolean> preloadProc;
 
     /** Cache group restore cache start phase. */
     private final DistributedProcess<UUID, Boolean> cacheStartProc;
@@ -124,6 +145,9 @@ public class SnapshotRestoreProcess {
         prepareRestoreProc = new DistributedProcess<>(
             ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, this::prepare, this::finishPrepare);
 
+        preloadProc = new DistributedProcess<>(
+            ctx, RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, this::preload, this::finishPreload);
+
         cacheStartProc = new DistributedProcess<>(
             ctx, RESTORE_CACHE_GROUP_SNAPSHOT_START, this::cacheStart, this::finishCacheStart);
 
@@ -248,19 +272,16 @@ public class SnapshotRestoreProcess {
                 cacheGrpNames.stream().collect(Collectors.toMap(CU::cacheId, v -> v));
 
             for (Map.Entry<ClusterNode, List<SnapshotMetadata>> entry : metas.entrySet()) {
-                SnapshotMetadata meta = F.first(entry.getValue());
-
-                assert meta != null : entry.getKey().id();
-
-                if (!entry.getKey().consistentId().toString().equals(meta.consistentId()))
-                    continue;
+                dataNodes.add(entry.getKey().id());
 
-                if (snpBltNodes == null)
-                    snpBltNodes = new HashSet<>(meta.baselineNodes());
+                for (SnapshotMetadata meta : entry.getValue()) {
+                    assert meta != null : entry.getKey().id();
 
-                dataNodes.add(entry.getKey().id());
+                    if (snpBltNodes == null)
+                        snpBltNodes = new HashSet<>(meta.baselineNodes());
 
-                reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+                    reqGrpIds.keySet().removeAll(meta.partitions().keySet());
+                }
             }
 
             if (snpBltNodes == null) {
@@ -277,20 +298,10 @@ public class SnapshotRestoreProcess {
                 return;
             }
 
-            Collection<String> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
-                node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState()));
-
-            snpBltNodes.removeAll(bltNodes);
-
-            if (!snpBltNodes.isEmpty()) {
-                finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
-                    "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']'));
-
-                return;
-            }
+            Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());
 
             SnapshotOperationRequest req =
-                new SnapshotOperationRequest(fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, dataNodes);
+                new SnapshotOperationRequest(fut0.rqId, F.first(dataNodes), snpName, cacheGrpNames, new HashSet<>(bltNodes));
 
             prepareRestoreProc.start(req.requestId(), req);
         });
@@ -315,28 +326,36 @@ public class SnapshotRestoreProcess {
     }
 
     /**
-     * Check if the cache or group with the specified name is currently being restored from the snapshot.
-     *
-     * @param cacheName Cache name.
-     * @param grpName Cache group name.
+     * @param ccfg Cache configuration.
      * @return {@code True} if the cache or group with the specified name is currently being restored.
      */
-    public boolean isRestoring(String cacheName, @Nullable String grpName) {
-        assert cacheName != null;
+    public boolean isRestoring(CacheConfiguration<?, ?> ccfg) {
+        return isRestoring(ccfg, opCtx);
+    }
 
-        SnapshotRestoreContext opCtx0 = opCtx;
+    /**
+     * Check if the cache or group with the specified name is currently being restored from the snapshot.
+     * @param opCtx Restoring context.
+     * @param ccfg Cache configuration.
+     * @return {@code True} if the cache or group with the specified name is currently being restored.
+     */
+    private boolean isRestoring(CacheConfiguration<?, ?> ccfg, @Nullable SnapshotRestoreContext opCtx) {
+        assert ccfg != null;
 
-        if (opCtx0 == null)
+        if (opCtx == null)
             return false;
 
-        Map<Integer, StoredCacheData> cacheCfgs = opCtx0.cfgs;
+        Map<Integer, StoredCacheData> cacheCfgs = opCtx.cfgs;
+
+        String cacheName = ccfg.getName();
+        String grpName = ccfg.getGroupName();
 
         int cacheId = CU.cacheId(cacheName);
 
         if (cacheCfgs.containsKey(cacheId))
             return true;
 
-        for (File grpDir : opCtx0.dirs) {
+        for (File grpDir : opCtx.dirs) {
             String locGrpName = FilePageStoreManager.cacheGroupName(grpDir);
 
             if (grpName != null) {
@@ -364,7 +383,7 @@ public class SnapshotRestoreProcess {
         if (opCtx0 == null || !reqId.globalId().equals(opCtx0.reqId))
             return Collections.emptySet();
 
-        return Collections.unmodifiableSet(opCtx0.nodes);
+        return new HashSet<>(opCtx0.nodes());
     }
 
     /**
@@ -412,7 +431,7 @@ public class SnapshotRestoreProcess {
     public void onNodeLeft(UUID leftNodeId) {
         SnapshotRestoreContext opCtx0 = opCtx;
 
-        if (opCtx0 != null && opCtx0.nodes.contains(leftNodeId)) {
+        if (opCtx0 != null && opCtx0.nodes().contains(leftNodeId)) {
             opCtx0.err.compareAndSet(null, new ClusterTopologyCheckedException(OP_REJECT_MSG +
                 "Required node has left the cluster [nodeId=" + leftNodeId + ']'));
         }
@@ -498,17 +517,18 @@ public class SnapshotRestoreProcess {
      * @param req Request to prepare cache group restore from the snapshot.
      * @return Result future.
      */
-    private IgniteInternalFuture<ArrayList<StoredCacheData>> prepare(SnapshotOperationRequest req) {
+    private IgniteInternalFuture<SnapshotRestoreOperationResponse> prepare(SnapshotOperationRequest req) {
         if (ctx.clientNode())
             return new GridFinishedFuture<>();
 
         try {
             DiscoveryDataClusterState state = ctx.state().clusterState();
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
 
             if (state.state() != ClusterState.ACTIVE || state.transition())
                 throw new IgniteCheckedException(OP_REJECT_MSG + "The cluster should be active.");
 
-            if (ctx.cache().context().snapshotMgr().isSnapshotCreating())
+            if (snpMgr.isSnapshotCreating())
                 throw new IgniteCheckedException(OP_REJECT_MSG + "A cluster snapshot operation is in progress.");
 
             if (ctx.encryption().isMasterKeyChangeInProgress()) {
@@ -530,6 +550,13 @@ public class SnapshotRestoreProcess {
                 }
             }
 
+            if (log.isInfoEnabled()) {
+                log.info("Starting local snapshot prepare restore operation" +
+                    " [reqId=" + req.requestId() +
+                    ", snapshot=" + req.snapshotName() +
+                    ", caches=" + req.groups() + ']');
+            }
+
             SnapshotRestoreContext opCtx0 = prepareContext(req);
 
             synchronized (this) {
@@ -537,13 +564,10 @@ public class SnapshotRestoreProcess {
 
                 ClusterSnapshotFuture fut0 = fut;
 
-                if (fut0 != null && fut0.interruptEx != null)
-                    opCtx0.err.compareAndSet(null, fut0.interruptEx);
+                if (fut0 != null)
+                    opCtx0.errHnd.accept(fut0.interruptEx);
             }
 
-            if (opCtx0.dirs.isEmpty())
-                return new GridFinishedFuture<>();
-
             // Ensure that shared cache groups has no conflicts.
             for (StoredCacheData cfg : opCtx0.cfgs.values()) {
                 ensureCacheAbsent(cfg.config().getName());
@@ -552,46 +576,11 @@ public class SnapshotRestoreProcess {
                     ensureCacheAbsent(cfg.config().getGroupName());
             }
 
-            if (log.isInfoEnabled()) {
-                log.info("Starting local snapshot restore operation" +
-                    " [reqId=" + req.requestId() +
-                    ", snapshot=" + req.snapshotName() +
-                    ", cache(s)=" + F.viewReadOnly(opCtx0.cfgs.values(), data -> data.config().getName()) + ']');
-            }
-
-            Consumer<Throwable> errHnd = (ex) -> opCtx.err.compareAndSet(null, ex);
-            BooleanSupplier stopChecker = () -> opCtx.err.get() != null;
-            GridFutureAdapter<ArrayList<StoredCacheData>> retFut = new GridFutureAdapter<>();
-
             if (ctx.isStopping())
-                throw new NodeStoppingException("Node is stopping.");
-
-            opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+                throw new NodeStoppingException("The node is stopping: " + ctx.localNodeId());
 
-            restoreAsync(opCtx0.snpName, opCtx0.dirs, ctx.localNodeId().equals(req.operationalNodeId()), stopChecker, errHnd)
-                .thenAccept(res -> {
-                    try {
-                        Throwable err = opCtx.err.get();
-
-                        if (err != null)
-                            throw err;
-
-                        for (File src : opCtx0.dirs)
-                            Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
-                    }
-                    catch (Throwable t) {
-                        log.error("Unable to restore cache group(s) from the snapshot " +
-                            "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t);
-
-                        retFut.onDone(t);
-
-                        return;
-                    }
-
-                    retFut.onDone(new ArrayList<>(opCtx.cfgs.values()));
-                });
-
-            return retFut;
+            return new GridFinishedFuture<>(new SnapshotRestoreOperationResponse(opCtx0.cfgs.values(),
+                opCtx0.metasPerNode.get(ctx.localNodeId())));
         }
         catch (IgniteIllegalStateException | IgniteCheckedException | RejectedExecutionException e) {
             log.error("Unable to restore cache group(s) from the snapshot " +
@@ -605,85 +594,11 @@ public class SnapshotRestoreProcess {
      * @param cacheDir Cache directory.
      * @return Temporary directory.
      */
-    private File formatTmpDirName(File cacheDir) {
+    private static File formatTmpDirName(File cacheDir) {
         return new File(cacheDir.getParent(), TMP_CACHE_DIR_PREFIX + cacheDir.getName());
     }
 
     /**
-     * Copy partition files and update binary metadata.
-     *
-     * @param snpName Snapshot name.
-     * @param dirs Cache directories to restore from the snapshot.
-     * @param updateMeta Update binary metadata flag.
-     * @param stopChecker Process interrupt checker.
-     * @param errHnd Error handler.
-     * @throws IgniteCheckedException If failed.
-     */
-    private CompletableFuture<Void> restoreAsync(
-        String snpName,
-        Collection<File> dirs,
-        boolean updateMeta,
-        BooleanSupplier stopChecker,
-        Consumer<Throwable> errHnd
-    ) throws IgniteCheckedException {
-        IgniteSnapshotManager snapshotMgr = ctx.cache().context().snapshotMgr();
-        String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
-
-        List<CompletableFuture<Void>> futs = new ArrayList<>();
-
-        if (updateMeta) {
-            File binDir = binaryWorkDir(snapshotMgr.snapshotLocalDir(snpName).getAbsolutePath(), pdsFolderName);
-
-            futs.add(CompletableFuture.runAsync(() -> {
-                try {
-                    ctx.cacheObjects().updateMetadata(binDir, stopChecker);
-                }
-                catch (Throwable t) {
-                    errHnd.accept(t);
-                }
-            }, snapshotMgr.snapshotExecutorService()));
-        }
-
-        for (File cacheDir : dirs) {
-            File tmpCacheDir = formatTmpDirName(cacheDir);
-            File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(snpName),
-                Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
-
-            assert snpCacheDir.exists() : "node=" + ctx.localNodeId() + ", dir=" + snpCacheDir;
-
-            for (File snpFile : snpCacheDir.listFiles()) {
-                futs.add(CompletableFuture.runAsync(() -> {
-                    if (stopChecker.getAsBoolean())
-                        return;
-
-                    try {
-                        if (Thread.interrupted())
-                            throw new IgniteInterruptedCheckedException("Thread has been interrupted.");
-
-                        File target = new File(tmpCacheDir, snpFile.getName());
-
-                        if (log.isDebugEnabled()) {
-                            log.debug("Copying file from the snapshot " +
-                                "[snapshot=" + snpName +
-                                ", src=" + snpFile +
-                                ", target=" + target + "]");
-                        }
-
-                        IgniteSnapshotManager.copy(snapshotMgr.ioFactory(), snpFile, target, snpFile.length());
-                    }
-                    catch (Throwable t) {
-                        errHnd.accept(t);
-                    }
-                }, ctx.cache().context().snapshotMgr().snapshotExecutorService()));
-            }
-        }
-
-        int futsSize = futs.size();
-
-        return CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize]));
-    }
-
-    /**
      * @param req Request to prepare cache group restore from the snapshot.
      * @return Snapshot restore operation context.
      * @throws IgniteCheckedException If failed.
@@ -693,76 +608,76 @@ public class SnapshotRestoreProcess {
             throw new IgniteCheckedException(OP_REJECT_MSG +
                 "The previous snapshot restore operation was not completed.");
         }
-
         GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
 
-        SnapshotMetadata meta = F.first(cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName()));
+        List<SnapshotMetadata> metas = cctx.snapshotMgr().readSnapshotMetadatas(req.snapshotName());
+
+        // Collection of baseline nodes that must survive and additional discovery data required for the affinity calculation.
+        DiscoCache discoCache = ctx.discovery().discoCache();
+
+        if (!F.transform(discoCache.aliveBaselineNodes(), F.node2id()).containsAll(req.nodes()))
+            throw new IgniteCheckedException("Restore context cannot be inited since the required baseline nodes missed: " + discoCache);
+
+        DiscoCache discoCache0 = discoCache.copy(discoCache.version(), null);
 
-        if (meta == null || !meta.consistentId().equals(cctx.localNode().consistentId().toString()))
-            return new SnapshotRestoreContext(req, Collections.emptyList(), Collections.emptyMap());
+        if (F.first(metas) == null)
+            return new SnapshotRestoreContext(req, discoCache0, Collections.emptyMap(), cctx.localNodeId(), Collections.emptyList());
 
-        if (meta.pageSize() != cctx.database().pageSize()) {
+        if (F.first(metas).pageSize() != cctx.database().pageSize()) {
             throw new IgniteCheckedException("Incompatible memory page size " +
-                "[snapshotPageSize=" + meta.pageSize() +
+                "[snapshotPageSize=" + F.first(metas).pageSize() +
                 ", local=" + cctx.database().pageSize() +
                 ", snapshot=" + req.snapshotName() +
                 ", nodeId=" + cctx.localNodeId() + ']');
         }
 
-        List<File> cacheDirs = new ArrayList<>();
         Map<String, StoredCacheData> cfgsByName = new HashMap<>();
         FilePageStoreManager pageStore = (FilePageStoreManager)cctx.pageStore();
 
         // Collect the cache configurations and prepare a temporary directory for copying files.
         // Metastorage can be restored only manually by directly copying files.
-        for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
-            name -> !METASTORAGE_CACHE_NAME.equals(name)))
-        {
-            String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
+        for (SnapshotMetadata meta : metas) {
+            for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), meta.folderName(),
+                name -> !METASTORAGE_CACHE_NAME.equals(name))) {
+                String grpName = FilePageStoreManager.cacheGroupName(snpCacheDir);
 
-            if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
-                continue;
+                if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName))
+                    continue;
 
-            File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
+                File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName);
 
-            if (cacheDir.exists()) {
-                if (!cacheDir.isDirectory()) {
-                    throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
-                        "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
-                }
+                if (cacheDir.exists()) {
+                    if (!cacheDir.isDirectory()) {
+                        throw new IgniteCheckedException("Unable to restore cache group, file with required directory " +
+                            "name already exists [group=" + grpName + ", file=" + cacheDir + ']');
+                    }
 
-                if (cacheDir.list().length > 0) {
-                    throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
-                        "[group=" + grpName + ", dir=" + cacheDir + ']');
-                }
+                    if (cacheDir.list().length > 0) {
+                        throw new IgniteCheckedException("Unable to restore cache group, directory is not empty " +
+                            "[group=" + grpName + ", dir=" + cacheDir + ']');
+                    }
 
-                if (!cacheDir.delete()) {
-                    throw new IgniteCheckedException("Unable to remove empty cache directory " +
-                        "[group=" + grpName + ", dir=" + cacheDir + ']');
+                    if (!cacheDir.delete()) {
+                        throw new IgniteCheckedException("Unable to remove empty cache directory " +
+                            "[group=" + grpName + ", dir=" + cacheDir + ']');
+                    }
                 }
-            }
 
-            File tmpCacheDir = formatTmpDirName(cacheDir);
+                File tmpCacheDir = formatTmpDirName(cacheDir);
 
-            if (tmpCacheDir.exists()) {
-                throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
-                    "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
-            }
+                if (tmpCacheDir.exists()) {
+                    throw new IgniteCheckedException("Unable to restore cache group, temp directory already exists " +
+                        "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+                }
 
-            if (!tmpCacheDir.mkdir()) {
-                throw new IgniteCheckedException("Unable to restore cache group, cannot create temp directory " +
-                    "[group=" + grpName + ", dir=" + tmpCacheDir + ']');
+                pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
             }
-
-            cacheDirs.add(cacheDir);
-
-            pageStore.readCacheConfigurations(snpCacheDir, cfgsByName);
         }
 
         Map<Integer, StoredCacheData> cfgsById =
             cfgsByName.values().stream().collect(Collectors.toMap(v -> CU.cacheId(v.config().getName()), v -> v));
 
-        return new SnapshotRestoreContext(req, cacheDirs, cfgsById);
+        return new SnapshotRestoreContext(req, discoCache0, cfgsById, cctx.localNodeId(), metas);
     }
 
     /**
@@ -770,7 +685,7 @@ public class SnapshotRestoreProcess {
      * @param res Results.
      * @param errs Errors.
      */
-    private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res, Map<UUID, Exception> errs) {
+    private void finishPrepare(UUID reqId, Map<UUID, SnapshotRestoreOperationResponse> res, Map<UUID, Exception> errs) {
         if (ctx.clientNode())
             return;
 
@@ -787,31 +702,358 @@ public class SnapshotRestoreProcess {
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+                    opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + ']');
+            }
+
+            CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" + opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>())
+                                .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            List<PartitionRestoreFuture> rmtAwaitParts = rmtLoadParts.values().stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+            // This is necessary for sending only one partitions request per each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = snapshotAffinity(
+                opCtx0.metasPerNode.entrySet()
+                    .stream()
+                    .filter(e -> !e.getKey().equals(ctx.localNodeId()))
+                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).remove(new PartitionRestoreFuture(partId)));
+
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + ']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : snpAff.entrySet()) {
+                    ctx.cache().context().snapshotMgr()
+                        .requestRemoteSnapshotFiles(m.getKey(),
+                            opCtx0.snpName,
+                            m.getValue(),
+                            opCtx0.stopChecker,
+                            (snpFile, t) -> {
+                                if (opCtx0.stopChecker.getAsBoolean())
+                                    throw new IgniteInterruptedException("Snapshot remote operation request cancelled.");
+
+                                if (t == null) {
+                                    int grpId = CU.cacheId(cacheGroupName(snpFile.getParentFile()));
+                                    int partId = partId(snpFile.getName());
+
+                                    PartitionRestoreFuture partFut = F.find(opCtx0.locProgress.get(grpId),
+                                        null,
+                                        new IgnitePredicate<PartitionRestoreFuture>() {
+                                            @Override public boolean apply(PartitionRestoreFuture f) {
+                                                return f.partId == partId;
+                                            }
+                                        });
+
+                                    assert partFut != null : snpFile.getAbsolutePath();
+
+                                    File tmpCacheDir = formatTmpDirName(grpToDir.get(grpId));
+
+                                    Path partFile = Paths.get(tmpCacheDir.getAbsolutePath(), snpFile.getName());
+
+                                    try {
+                                        IgniteSnapshotManager.copy(snpMgr.ioFactory(),
+                                            snpFile,
+                                            partFile.toFile(),
+                                            snpFile.length());
+
+                                        partFut.complete(partFile);
+                                    }
+                                    catch (Exception e) {
+                                        opCtx0.errHnd.accept(e);
+                                        completeListExceptionally(rmtAwaitParts, e);
+                                    }
+                                }
+                                else {
+                                    opCtx0.errHnd.accept(t);
+                                    completeListExceptionally(rmtAwaitParts, t);
+                                }
+                            });
+                }
+            }
+            catch (IgniteCheckedException e) {
+                opCtx0.errHnd.accept(e);
+                completeListExceptionally(rmtAwaitParts, e);
+            }
+
+            List<PartitionRestoreFuture> allParts = opCtx0.locProgress.values().stream().flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+            int size = allParts.size();
+
+            CompletableFuture.allOf(allParts.toArray(new CompletableFuture[size]))
+                .runAfterBothAsync(metaFut, () -> {
+                    try {
+                        if (opCtx0.stopChecker.getAsBoolean())
+                            throw new IgniteInterruptedException("The operation has been stopped on temporary directory switch.");
+
+                        for (File src : opCtx0.dirs)
+                            Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
+                    }
+                    catch (IOException e) {
+                        throw new IgniteException(e);
+                    }
+                }, snpMgr.snapshotExecutorService())
+                .whenComplete((r, t) -> opCtx0.errHnd.accept(t))
+                .whenComplete((res, t) -> {
+                    Throwable t0 = ofNullable(opCtx0.err.get()).orElse(t);
+
+                    if (t0 == null)
+                        retFut.onDone(true);
+                    else {
+                        log.error("Unable to restore cache group(s) from a snapshot " +
+                            "[reqId=" + opCtx.reqId + ", snapshot=" + opCtx.snpName + ']', t0);
+
+                        retFut.onDone(t0);
+                    }
+                });
+        }
+        catch (Exception ex) {
+            opCtx0.errHnd.accept(ex);
+
+            return new GridFinishedFuture<>(ex);
+        }
+
+        return retFut;
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
+
+        opCtx0.errHnd.accept(failure);
+
+        if (failure != null) {
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+            if (U.isLocalNodeCoordinator(ctx.discovery()))
+                rollbackRestoreProc.start(reqId, reqId);
+
+            return;
+        }
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
             cacheStartProc.start(reqId, reqId);
     }
 
@@ -843,7 +1085,8 @@ public class SnapshotRestoreProcess {
 
         // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
         // the cluster during the cache startup, the whole procedure will be rolled back.
-        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
+        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false,
+            IgniteUuid.fromUuid(reqId));
     }
 
     /**
@@ -858,7 +1101,7 @@ public class SnapshotRestoreProcess {
         SnapshotRestoreContext opCtx0 = opCtx;
 
         Exception failure = errs.values().stream().findFirst().
-            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+            orElse(checkNodeLeft(opCtx0.nodes(), res.keySet()));
 
         if (failure == null) {
             finishProcess(reqId);
@@ -873,11 +1116,48 @@ public class SnapshotRestoreProcess {
     }
 
     /**
+     * @param metas Map of snapshot metadata distribution across the cluster.
+     * @return Map of cache partitions per each node.
+     */
+    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+        Map<UUID, List<SnapshotMetadata>> metas,
+        BiPredicate<Integer, Integer> filter
+    ) {
+        Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+        List<UUID> nodes = new ArrayList<>(metas.keySet());
+        Collections.shuffle(nodes);
+
+        Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
+        nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
+
+        for (Map.Entry<UUID, List<SnapshotMetadata>> e : shuffleMetas.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(Collections.emptyList())) {
+                Map<Integer, Set<Integer>> parts = ofNullable(meta.partitions()).orElse(Collections.emptyMap());
+
+                for (Map.Entry<Integer, Set<Integer>> metaParts : parts.entrySet()) {
+                    for (Integer partId : metaParts.getValue()) {
+                        if (filter.test(metaParts.getKey(), partId)) {
+                            nodeToSnp.computeIfAbsent(nodeId, n -> new HashMap<>())
+                                .computeIfAbsent(metaParts.getKey(), k -> new HashSet<>())
+                                .add(partId);
+                        }
+                    }
+                }
+            }
+        }
+
+        return nodeToSnp;
+    }
+
+    /**
      * @param reqNodes Set of required topology nodes.
      * @param respNodes Set of responding topology nodes.
      * @return Error, if no response was received from the required topology node.
      */
-    private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
+    private Exception checkNodeLeft(Collection<UUID> reqNodes, Set<UUID> respNodes) {
         if (!respNodes.containsAll(reqNodes)) {
             Set<UUID> leftNodes = new HashSet<>(reqNodes);
 
@@ -907,46 +1187,46 @@ public class SnapshotRestoreProcess {
 
         synchronized (this) {
             opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+        }
 
-            try {
-                ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
-                    if (log.isInfoEnabled()) {
-                        log.info("Removing restored cache directories [reqId=" + reqId +
-                            ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
-                    }
+        try {
+            ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
+                if (log.isInfoEnabled()) {
+                    log.info("Removing restored cache directories [reqId=" + reqId +
+                        ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
+                }
 
-                    IgniteCheckedException ex = null;
+                IgniteCheckedException ex = null;
 
-                    for (File cacheDir : opCtx0.dirs) {
-                        File tmpCacheDir = formatTmpDirName(cacheDir);
+                for (File cacheDir : opCtx0.dirs) {
+                    File tmpCacheDir = formatTmpDirName(cacheDir);
 
-                        if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+                    if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
+                        log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
+                            "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
 
-                            ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
-                        }
+                        ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
+                    }
 
-                        if (cacheDir.exists() && !U.delete(cacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
+                    if (cacheDir.exists() && !U.delete(cacheDir)) {
+                        log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
+                            "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
 
-                            ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
-                        }
+                        ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
                     }
+                }
 
-                    if (ex != null)
-                        retFut.onDone(ex);
-                    else
-                        retFut.onDone(true);
-                });
-            }
-            catch (RejectedExecutionException e) {
-                log.error("Unable to perform rollback routine, task has been rejected " +
-                    "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
+                if (ex != null)
+                    retFut.onDone(ex);
+                else
+                    retFut.onDone(true);
+            });
+        }
+        catch (RejectedExecutionException e) {
+            log.error("Unable to perform rollback routine, task has been rejected " +
+                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
 
-                retFut.onDone(e);
-            }
+            retFut.onDone(e);
         }
 
         return retFut;
@@ -968,8 +1248,8 @@ public class SnapshotRestoreProcess {
 
         SnapshotRestoreContext opCtx0 = opCtx;
 
-        if (!res.keySet().containsAll(opCtx0.nodes)) {
-            Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes);
+        if (!res.keySet().containsAll(opCtx0.nodes())) {
+            Set<UUID> leftNodes = new HashSet<>(opCtx0.nodes());
 
             leftNodes.removeAll(res.keySet());
 
@@ -981,6 +1261,83 @@ public class SnapshotRestoreProcess {
     }
 
     /**
+     * @param mgr Ignite snapshot manager.
+     * @param opCtx Snapshot operation context.
+     * @param srcDir Snapshot directory to copy from.
+     * @param targetDir Destination directory to copy to.
+     */
+    private static void copyLocalAsync(
+        IgniteSnapshotManager mgr,
+        SnapshotRestoreContext opCtx,
+        File srcDir,
+        File targetDir,
+        PartitionRestoreFuture partFut
+    ) {
+        File snpFile = new File(srcDir, FilePageStoreManager.getPartitionFileName(partFut.partId));
+        Path partFile = Paths.get(targetDir.getAbsolutePath(), FilePageStoreManager.getPartitionFileName(partFut.partId));
+
+        CompletableFuture.supplyAsync(() -> {
+                if (opCtx.stopChecker.getAsBoolean())
+                    throw new IgniteInterruptedException("The operation has been stopped on copy file: " + snpFile.getAbsolutePath());
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedException("Thread has been interrupted: " + Thread.currentThread().getName());
+
+                if (!snpFile.exists()) {
+                    throw new IgniteException("Partition snapshot file doesn't exist [snpName=" + opCtx.snpName +
+                        ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + snpFile.getName() + ']');
+                }
+
+                IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, partFile.toFile(), snpFile.length());
+
+                return partFile;
+            }, mgr.snapshotExecutorService())
+            .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+            .whenComplete((r, t) -> {
+                if (t == null)
+                    partFut.complete(partFile);
+                else
+                    partFut.completeExceptionally(t);
+            });
+    }
+
+    /**
+     * @param affCache Affinity cache.
+     * @param node Cluster node to get assigned partitions.
+     * @return The set of partitions assigned to the given node.
+     */
+    private static <T> Set<T> nodeAffinityPartitions(
+        GridAffinityAssignmentCache affCache,
+        ClusterNode node,
+        IntFunction<T> factory
+    ) {
+        return IntStream.range(0, affCache.partitions())
+            .filter(p -> affCache.idealAssignment().assignment().get(p).contains(node))
+            .mapToObj(factory)
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * @param col Collection of sets to complete.
+     * @param ex Exception to set.
+     */
+    private static void completeListExceptionally(List<PartitionRestoreFuture> col, Throwable ex) {
+        for (PartitionRestoreFuture f : col)
+            f.completeExceptionally(ex);
+    }
+
+    /**
+     * @param map Map of partitions and cache groups.
+     * @return String representation.
+     */
+    private static String partitionsMapToCompactString(Map<Integer, Set<Integer>> map) {
+        return map.entrySet()
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> S.compact(e.getValue())))
+            .toString();
+    }
+
+    /**
      * Cache group restore from snapshot operation context.
      */
     private static class SnapshotRestoreContext {
@@ -990,15 +1347,43 @@ public class SnapshotRestoreProcess {
         /** Snapshot name. */
         private final String snpName;
 
-        /** Baseline node IDs that must be alive to complete the operation. */
-        private final Set<UUID> nodes;
+        /** Baseline discovery cache for node IDs that must be alive to complete the operation.*/
+        private final DiscoCache discoCache;
 
-        /** List of restored cache group directories. */
-        private final Collection<File> dirs;
+        /** Operational node id. */
+        private final UUID opNodeId;
+
+        /**
+         * Set of restored cache groups path on local node. Collected when all cache configurations received
+         * from the <tt>prepare</tt> distributed process.
+         */
+        private final Set<File> dirs = new HashSet<>();
 
         /** The exception that led to the interruption of the process. */
         private final AtomicReference<Throwable> err = new AtomicReference<>();
 
+        /** Distribution of snapshot metadata files across the cluster. */
+        private final Map<UUID, List<SnapshotMetadata>> metasPerNode = new HashMap<>();
+
+        /** Context error handler. */
+        private final Consumer<Throwable> errHnd = (ex) -> err.compareAndSet(null, ex);
+
+        /** Stop condition checker. */
+        private final BooleanSupplier stopChecker = () -> err.get() != null;
+
+        /** Progress of processing cache group partitions on the local node.*/
+        private final Map<Integer, Set<PartitionRestoreFuture>> locProgress = new HashMap<>();
+
+        /**
+         * The stop future responsible for stopping cache groups during the rollback phase. Will be completed when the rollback
+         * process executes and all the cache group stop actions completes (the processCacheStopRequestOnExchangeDone finishes
+         * successfully and all the data deleted from disk).
+         */
+        private final GridFutureAdapter<Void> locStopCachesCompleteFut = new GridFutureAdapter<>();
+
+        /** Calculated affinity assignment cache per each cache group. */
+        private final Map<String, GridAffinityAssignmentCache> affCache = new ConcurrentHashMap<>();
+
         /** Cache ID to configuration mapping. */
         private volatile Map<Integer, StoredCacheData> cfgs;
 
@@ -1007,17 +1392,90 @@ public class SnapshotRestoreProcess {
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** Snapshot operation prepare response. */
+    private static class SnapshotRestoreOperationResponse implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache configurations on local node. */
+        private final List<StoredCacheData> ccfgs;
+
+        /** Snapshot metadata files on local node. */
+        private final List<SnapshotMetadata> metas;
+
+        /**
+         * @param ccfgs Cache configurations on local node.
+         * @param metas Snapshot metadata files on local node.
+         */
+        public SnapshotRestoreOperationResponse(
+            Collection<StoredCacheData> ccfgs,
+            Collection<SnapshotMetadata> metas
+        ) {
+            this.ccfgs = new ArrayList<>(ccfgs);
+            this.metas = new ArrayList<>(metas);
+        }
+    }
+
+    /** Future will be completed when partition processing ends. */
+    private static class PartitionRestoreFuture extends CompletableFuture<Path> {
+        /** Partition id. */
+        private final int partId;
+
+        /**
+         * @param partId Partition id.
+         */
+        private PartitionRestoreFuture(int partId) {
+            this.partId = partId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PartitionRestoreFuture future = (PartitionRestoreFuture)o;
+
+            return partId == future.partId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(partId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(PartitionRestoreFuture.class, this);
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 730b9e1..d91b5bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -453,6 +453,11 @@ public class DistributedProcess<I extends Serializable, R extends Serializable>
         RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE,
 
         /**
+         * Cache group restore preload phase.
+         */
+        RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD,
+
+        /**
          * Cache group restore cache start phase.
          */
         RESTORE_CACHE_GROUP_SNAPSHOT_START,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 77de5cd..e7c06a1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -716,7 +716,7 @@ public class GridFunc {
      *
      * @return Closure which converts node to node ID.
      */
-    public static IgniteClosure<ClusterNode, UUID> node2id() {
+    public static IgniteClosure<? super ClusterNode, UUID> node2id() {
         return NODE2ID;
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 93ef0e2..82847cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -37,6 +37,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.encryption.AbstractEncryptionTest;
@@ -358,9 +360,32 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
         Function<Integer, V> factory,
         CacheConfiguration<Integer, V>... ccfgs
     ) throws Exception {
-        for (int g = 0; g < grids; g++)
-            startGrid(optimize(getConfiguration(getTestIgniteInstanceName(g))
-                .setCacheConfiguration(ccfgs)));
+        return startGridsWithCache(grids, keys, factory, (id, cfg) -> cfg.getWorkDirectory(), ccfgs);
+    }
+
+    /**
+     * @param grids Number of ignite instances to start.
+     * @param keys Number of keys to create.
+     * @param factory Factory which produces values.
+     * @param <V> Cache value type.
+     * @return Ignite coordinator instance.
+     * @throws Exception If fails.
+     */
+    protected <V> IgniteEx startGridsWithCache(
+        int grids,
+        int keys,
+        Function<Integer, V> factory,
+        BiFunction<Integer, IgniteConfiguration, String> newWorkDir,
+        CacheConfiguration<Integer, V>... ccfgs
+    ) throws Exception {
+        for (int g = 0; g < grids; g++) {
+            IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(g))
+                .setCacheConfiguration(ccfgs));
+
+            cfg.setWorkDirectory(newWorkDir.apply(g, cfg));
+
+            startGrid(cfg);
+        }
 
         IgniteEx ig = grid(0);
 
@@ -519,32 +544,6 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param snpName Unique snapshot name.
-     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
-     * @param snpSndr Sender which used for snapshot sub-task processing.
-     * @return Future which will be completed when snapshot is done.
-     */
-    protected SnapshotFutureTask startLocalSnapshotTask(
-        GridCacheSharedContext<?, ?> cctx,
-        String snpName,
-        Map<Integer, Set<Integer>> parts,
-        SnapshotSender snpSndr
-    ) throws IgniteCheckedException {
-        SnapshotFutureTask snpFutTask = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts, encryption, snpSndr);
-
-        snpFutTask.start();
-
-        // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
-        // due to checkpoint already running and we need to schedule the next one
-        // right after current will be completed.
-        cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
-
-        snpFutTask.started().get();
-
-        return snpFutTask;
-    }
-
-    /**
      * @param grids Grids to block snapshot executors.
      * @return Wrapped snapshot executor list.
      */
@@ -604,6 +603,39 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param snpName Unique snapshot name.
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param snpSndr Sender which used for snapshot sub-task processing.
+     * @return Future which will be completed when snapshot is done.
+     */
+    protected static IgniteInternalFuture<?> startLocalSnapshotTask(
+        GridCacheSharedContext<?, ?> cctx,
+        String snpName,
+        Map<Integer, Set<Integer>> parts,
+        boolean withMetaStorage,
+        SnapshotSender snpSndr
+    ) throws IgniteCheckedException {
+        AbstractSnapshotFutureTask<?> task = cctx.snapshotMgr().registerSnapshotTask(snpName, cctx.localNodeId(), parts,
+            withMetaStorage, snpSndr);
+
+        if (!(task instanceof SnapshotFutureTask))
+            throw new IgniteCheckedException("Snapshot task hasn't been registered: " + task);
+
+        SnapshotFutureTask snpFutTask = (SnapshotFutureTask)task;
+
+        snpFutTask.start();
+
+        // Snapshot is still in the INIT state. beforeCheckpoint has been skipped
+        // due to checkpoint already running and we need to schedule the next one
+        // right after current will be completed.
+        cctx.database().forceCheckpoint(String.format(CP_SNAPSHOT_REASON, snpName));
+
+        snpFutTask.started().get();
+
+        return snpFutTask;
+    }
+
+    /**
      * @param ignite Ignite instance to resolve discovery spi to.
      * @return BlockingCustomMessageDiscoverySpi instance.
      */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
index 49a3c62..c5b4ccb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java
@@ -598,6 +598,7 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx,
             SNAPSHOT_NAME,
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            encryption,
             mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
 
         snpFut.get();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
index 579e9c3..a876f74 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java
@@ -21,12 +21,25 @@ import java.util.Collections;
 import java.util.function.Function;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.lang.IgniteFuture;
 import org.junit.runners.Parameterized;
 
 /**
  * Snapshot restore test base.
  */
 public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnapshotSelfTest {
+    /** Cache 1 name. */
+    protected static final String CACHE1 = "cache1";
+
+    /** Cache 2 name. */
+    protected static final String CACHE2 = "cache2";
+
+    /** Default shared cache group name. */
+    protected static final String SHARED_GRP = "shared";
+
     /** Parameters. Encrypted snapshots are not supported. */
     @Parameterized.Parameters(name = "Encryption is disabled")
     public static Iterable<Boolean> disabledEncryption() {
@@ -43,6 +56,29 @@ public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnaps
         return startGridsWithSnapshot(nodesCnt, keysCnt, false);
     }
 
+    /**
+     * @param spi Test communication spi.
+     * @param restorePhase The type of distributed process on which communication is blocked.
+     * @param grpName Cache group name.
+     * @return Snapshot restore future.
+     * @throws InterruptedException if interrupted.
+     */
+    protected IgniteFuture<Void> waitForBlockOnRestore(
+        TestRecordingCommunicationSpi spi,
+        DistributedProcess.DistributedProcessType restorePhase,
+        String grpName
+    ) throws InterruptedException {
+        spi.blockMessages((node, msg) ->
+            msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
+
+        IgniteFuture<Void> fut =
+            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
+
+        spi.waitForBlocked();
+
+        return fut;
+    }
+
     /** */
     protected class BinaryValueBuilder implements Function<Integer, Object> {
         /** Binary type name. */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index e6f93b5..b6361e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -35,7 +35,6 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.IgniteSnapshot;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -59,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
 import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.IgniteSpiException;
@@ -74,6 +74,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess.TMP_CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
@@ -86,15 +87,6 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
     /** Type name used for binary and SQL. */
     private static final String TYPE_NAME = "CustomType";
 
-    /** Cache 1 name. */
-    private static final String CACHE1 = "cache1";
-
-    /** Cache 2 name. */
-    private static final String CACHE2 = "cache2";
-
-    /** Default shared cache group name. */
-    private static final String SHARED_GRP = "shared";
-
     /** Reset consistent ID flag. */
     private boolean resetConsistentId;
 
@@ -164,14 +156,25 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         awaitPartitionMapExchange();
 
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).record(SnapshotFilesRequestMessage.class);
+
         // Restore all cache groups.
         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
 
+        awaitPartitionMapExchange(true, true, null, true);
+
         assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
         assertCacheKeys(ignite.cache(CACHE1), CACHE_KEYS_RANGE);
         assertCacheKeys(ignite.cache(CACHE2), CACHE_KEYS_RANGE);
 
         waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+        // Ensure there is no remote snapshot requests occurred.
+        for (Ignite g : G.allGrids()) {
+            assertTrue("Snapshot files remote requests must not happened due to all the files are available locally",
+                TestRecordingCommunicationSpi.spi(g).recordedMessages(true).isEmpty());
+        }
     }
 
     /** @throws Exception If failed. */
@@ -364,14 +367,10 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         resetBaselineTopology();
 
-        IgniteFuture<Void> fut =
-            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
 
-        GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), IgniteIllegalStateException.class, null);
-
-        ensureCacheAbsent(dfltCacheCfg);
-
-        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
+        assertCacheKeys(grid(0).cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
     }
 
     /** @throws Exception If failed. */
@@ -507,7 +506,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
      */
     @Test
     public void testParallelCacheStartWithTheSameNameOnPrepare() throws Exception {
-        checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, IgniteCheckedException.class,
+        checkCacheStartWithTheSameName(RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, IgniteCheckedException.class,
             "Cache start failed. A cache or group with the same name is currently being restored from a snapshot");
     }
 
@@ -555,7 +554,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(3));
 
-        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE, DEFAULT_CACHE_NAME);
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD, DEFAULT_CACHE_NAME);
 
         IgniteInternalFuture<?> fut0 = runAsync(() -> stopGrid(3, true));
 
@@ -574,12 +573,15 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
 
         waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED);
 
-        GridTestUtils.assertThrowsAnyCause(
-            log,
-            () -> startGrid(3),
-            IgniteSpiException.class,
-            "to add the node to cluster - remove directories with the caches"
-        );
+        dfltCacheCfg = null;
+
+        // Should start successfully.
+        Ignite ignite = startGrid(3);
+
+        resetBaselineTopology();
+        awaitPartitionMapExchange();
+
+        assertNull(ignite.cache(DEFAULT_CACHE_NAME));
     }
 
     /** @throws Exception If failed. */
@@ -594,7 +596,7 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
         CountDownLatch stopLatch = new CountDownLatch(1);
 
         spi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage &&
-            ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PREPARE.ordinal());
+            ((SingleNodeMessage<?>)msg).type() == RESTORE_CACHE_GROUP_SNAPSHOT_PRELOAD.ordinal());
 
         String failingFilePath = Paths.get(CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME,
             PART_FILE_PREFIX + (dfltCacheCfg.getAffinity().partitions() / 2) + FILE_SUFFIX).toString();
@@ -760,29 +762,6 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
     }
 
     /**
-     * @param spi Test communication spi.
-     * @param restorePhase The type of distributed process on which communication is blocked.
-     * @param grpName Cache group name.
-     * @return Snapshot restore future.
-     * @throws InterruptedException if interrupted.
-     */
-    private IgniteFuture<Void> waitForBlockOnRestore(
-        TestRecordingCommunicationSpi spi,
-        DistributedProcessType restorePhase,
-        String grpName
-    ) throws InterruptedException {
-        spi.blockMessages((node, msg) ->
-            msg instanceof SingleNodeMessage && ((SingleNodeMessage<?>)msg).type() == restorePhase.ordinal());
-
-        IgniteFuture<Void> fut =
-            grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(grpName));
-
-        spi.waitForBlocked();
-
-        return fut;
-    }
-
-    /**
      * Custom I/O factory to preprocessing created files.
      */
     private static class CustomFileIOFactory implements FileIOFactory {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index 55468f4..dd83af1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -131,7 +131,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         }, 5, "cache-loader-");
 
         // Register task but not schedule it on the checkpoint.
-        SnapshotFutureTask snpFutTask = mgr.registerSnapshotTask(SNAPSHOT_NAME,
+        SnapshotFutureTask snpFutTask = (SnapshotFutureTask)mgr.registerSnapshotTask(SNAPSHOT_NAME,
             cctx.localNodeId(),
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
             encryption,
@@ -256,6 +256,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
             SNAPSHOT_NAME,
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            encryption,
             mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME));
 
         // Check the right exception thrown.
@@ -279,6 +280,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> fut = startLocalSnapshotTask(ig.context().cache().context(),
             SNAPSHOT_NAME,
             parts,
+            encryption,
             new DelegateSnapshotSender(log, mgr0.snapshotExecutorService(),
                 mgr0.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
                 @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
@@ -314,6 +316,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest {
         IgniteInternalFuture<?> snpFut = startLocalSnapshotTask(cctx0,
             SNAPSHOT_NAME,
             F.asMap(CU.cacheId(DEFAULT_CACHE_NAME), null),
+            encryption,
             new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.localSnapshotSenderFactory().apply(SNAPSHOT_NAME)) {
                 @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
                     try {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
new file mode 100644
index 0000000..8d78836
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.TransmissionCancelledException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheGroupName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class IgniteSnapshotRemoteRequestTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRemoteRequestFromSingleNode() throws Exception {
+        int rqCnt = 10;
+
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), grid(1).localNode().id());
+
+        awaitPartitionMapExchange();
+
+        CountDownLatch latch = new CountDownLatch(parts.values().stream().mapToInt(Set::size).sum() * rqCnt);
+        GridCompoundFuture<Void, Void> compFut = new GridCompoundFuture<>();
+
+        IgniteInternalFuture<?> runFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                Map<Integer, Set<Integer>> parts0 = new HashMap<>();
+
+                for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+                    parts0.computeIfAbsent(e.getKey(), k -> new HashSet<>()).addAll(e.getValue());
+
+                IgniteInternalFuture<Void> locFut = null;
+
+                compFut.add(locFut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+                    SNAPSHOT_NAME,
+                    parts,
+                    () -> false,
+                    defaultPartitionConsumer(parts0, latch)));
+
+                locFut.listen(f -> assertEquals("All partitions must be handled: " + parts0,
+                    F.size(parts0.values(), Set::isEmpty), parts0.size()));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }, rqCnt, "rq-creator-");
+
+        runFut.get(TIMEOUT);
+        U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+        compFut.markInitialized().get(TIMEOUT);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRemoteRequestEachOther() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        IgniteSnapshotManager mgr0 = snp(ignite);
+        IgniteSnapshotManager mgr1 = snp(grid(1));
+
+        UUID node0 = grid(0).localNode().id();
+        UUID node1 = grid(1).localNode().id();
+
+        Map<Integer, Set<Integer>> fromNode1 = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), node1);
+        Map<Integer, Set<Integer>> fromNode0 = owningParts(grid(1), CU.cacheId(DEFAULT_CACHE_NAME), node0);
+
+        G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g)
+            .blockMessages((n, msg) -> msg instanceof SnapshotFilesRequestMessage));
+
+        CountDownLatch latch = new CountDownLatch(fromNode1.values().stream().mapToInt(Set::size).sum() +
+            fromNode0.values().stream().mapToInt(Set::size).sum());
+
+        // Snapshot must be taken on node1 and transmitted to node0.
+        IgniteInternalFuture<?> futFrom1To0 = mgr0.requestRemoteSnapshotFiles(node1, SNAPSHOT_NAME, fromNode1, () -> false,
+            defaultPartitionConsumer(fromNode1, latch));
+        IgniteInternalFuture<?> futFrom0To1 = mgr1.requestRemoteSnapshotFiles(node0, SNAPSHOT_NAME, fromNode0, () -> false,
+            defaultPartitionConsumer(fromNode0, latch));
+
+        G.allGrids().forEach(g -> TestRecordingCommunicationSpi.spi(g).stopBlock());
+
+        latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+
+        futFrom0To1.get(TIMEOUT);
+        futFrom1To0.get(TIMEOUT);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testRemoteRequestedInitiatorNodeLeft() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        awaitPartitionMapExchange();
+
+        IgniteSnapshotManager mgr1 = snp(grid(1));
+        UUID rmtNodeId = grid(1).localNode().id();
+        UUID locNodeId = grid(0).localNode().id();
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME), rmtNodeId);
+
+        CountDownLatch sndLatch = new CountDownLatch(1);
+
+        mgr1.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
+            @Override public SnapshotSender apply(String s, UUID uuid) {
+                return new DelegateSnapshotSender(log, mgr1.snapshotExecutorService(), mgr1.remoteSnapshotSenderFactory(s, uuid)) {
+                    @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                        if (partId(part.getName()) > 0) {
+                            try {
+                                sndLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+                            }
+                            catch (Exception e) {
+                                throw new IgniteException(e);
+                            }
+                        }
+
+                        super.sendPart0(part, cacheDirName, pair, length);
+                    }
+                };
+            }
+        });
+
+        snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+            SNAPSHOT_NAME,
+            parts,
+            () -> false,
+            (part, t) -> {
+            });
+
+        IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
+
+        assertTrue(waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                IgniteInternalFuture<?> snpFut = snp(grid(1)).lastScheduledSnapshotResponseRemoteTask(locNodeId);
+
+                if (snpFut == null)
+                    return false;
+                else {
+                    futs[0] = snpFut;
+
+                    return true;
+                }
+            }
+        }, 5_000L));
+
+        stopGrid(0);
+        sndLatch.countDown();
+
+        GridTestUtils.assertThrowsAnyCause(log, () -> futs[0].get(TIMEOUT), ClusterTopologyCheckedException.class, null);
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRequestRemoteSourceNodeLeft() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
+            grid(1).localNode().id());
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+            SNAPSHOT_NAME,
+            parts,
+            () -> false,
+            (part, t) -> {
+                if (t == null) {
+                    int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+
+                    assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
+                    assertTrue("Received partition has not been requested",
+                        parts.get(grpId).contains(partId(part.getName())));
+
+                    try {
+                        U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+                else {
+                    assertTrue(t instanceof ClusterTopologyCheckedException);
+                    assertNull(part);
+                }
+            });
+
+        stopGrid(1);
+
+        latch.countDown();
+
+        assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), ClusterTopologyCheckedException.class,
+            "he node from which a snapshot has been requested left the grid");
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testSnapshotRequestRemoteCancel() throws Exception {
+        IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
+
+        ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        Map<Integer, Set<Integer>> parts = owningParts(ignite, CU.cacheId(DEFAULT_CACHE_NAME),
+            grid(1).localNode().id());
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean stopChecker = new AtomicBoolean();
+
+        IgniteInternalFuture<Void> fut = snp(ignite).requestRemoteSnapshotFiles(grid(1).localNode().id(),
+            SNAPSHOT_NAME,
+            parts,
+            stopChecker::get,
+            (part, t) -> {
+                try {
+                    U.await(latch, TIMEOUT, TimeUnit.MILLISECONDS);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            });
+
+        IgniteInternalFuture<?>[] futs = new IgniteInternalFuture[1];
+
+        assertTrue(waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                IgniteInternalFuture<?> snpFut = snp(grid(1))
+                    .lastScheduledSnapshotResponseRemoteTask(grid(0).localNode().id());
+
+                if (snpFut == null)
+                    return false;
+                else {
+                    futs[0] = snpFut;
+
+                    return true;
+                }
+            }
+        }, 5_000L));
+
+        stopChecker.set(true);
+        latch.countDown();
+
+        assertThrowsAnyCause(log, () -> fut.get(TIMEOUT), TransmissionCancelledException.class,
+            "Future cancelled prior to the all requested partitions processed");
+    }
+
+    /**
+     * @param parts Expected partitions.
+     * @param latch Latch to await partitions processed.
+     * @return Consumer.
+     */
+    private static BiConsumer<File, Throwable> defaultPartitionConsumer(Map<Integer, Set<Integer>> parts, CountDownLatch latch) {
+        return (part, t) -> {
+            assertNull(t);
+
+            int grpId = CU.cacheId(cacheGroupName(part.getParentFile()));
+
+            assertTrue("Received cache group has not been requested", parts.containsKey(grpId));
+            assertTrue("Received partition has not been requested",
+                parts.get(grpId).remove(partId(part.getName())));
+
+            latch.countDown();
+        };
+    }
+
+    /**
+     * @param src Source node to calculate.
+     * @param grpId Group id to collect OWNING partitions.
+     * @param rmtNodeId Remote node id.
+     * @return Map of collected parts.
+     */
+    private static Map<Integer, Set<Integer>> owningParts(IgniteEx src, int grpId, UUID rmtNodeId) {
+        return Collections.singletonMap(grpId, src.context()
+            .cache()
+            .cacheGroup(grpId)
+            .topology()
+            .partitions(rmtNodeId)
+            .entrySet()
+            .stream()
+            .filter(p -> p.getValue() == GridDhtPartitionState.OWNING)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet()));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
new file mode 100644
index 0000000..2c532fd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** */
+    private static final String FIRST_CLUSTER_PREFIX = "one_";
+
+    /** */
+    private static final String SECOND_CLUSTER_PREFIX = "two_";
+
+    /** */
+    private static final String CACHE_WITH_NODE_FILTER = "cacheWithFilter";
+
+    /** Node filter filter test restoring on some nodes only. */
+    private static final IgnitePredicate<ClusterNode> ZERO_SUFFIX_NODE_FILTER = new IgnitePredicate<ClusterNode>() {
+        @Override public boolean apply(ClusterNode node) {
+            return node.consistentId().toString().endsWith("0");
+        }
+    };
+
+    /** {@code true} if snapshot parts has been initialized on test-class startup. */
+    private static boolean inited;
+
+    /** Snapshot parts on dedicated cluster. Each part has its own local directory. */
+    private static final Set<Path> snpParts = new HashSet<>();
+
+    /** */
+    private static final Function<String, BiFunction<Integer, IgniteConfiguration, String>> CLUSTER_DIR =
+        new Function<String, BiFunction<Integer, IgniteConfiguration, String>>() {
+            @Override public BiFunction<Integer, IgniteConfiguration, String> apply(String prefix) {
+                return (id, cfg) -> Paths.get(defaultWorkDirectory().toString(),
+                    prefix + U.maskForFileName(cfg.getIgniteInstanceName())).toString();
+            }
+        };
+
+    /** Cache value builder. */
+    private final Function<Integer, Object> valBuilder = String::valueOf;
+
+    /** @throws Exception If fails. */
+    @Before
+    public void prepareDedicatedSnapshot() throws Exception {
+        if (!inited) {
+            cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+
+            CacheConfiguration<Integer, Object> cacheCfg1 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+            CacheConfiguration<Integer, Object> cacheCfg2 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+            CacheConfiguration<Integer, Object> cacheCfg3 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE_WITH_NODE_FILTER))
+                    .setBackups(1)
+                    .setAffinity(new RendezvousAffinityFunction(false, 16))
+                    .setNodeFilter(ZERO_SUFFIX_NODE_FILTER);
+
+            IgniteEx ignite = startDedicatedGridsWithCache(FIRST_CLUSTER_PREFIX, 6, CACHE_KEYS_RANGE, valBuilder,
+                dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2, cacheCfg3);
+
+            ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+            awaitPartitionMapExchange();
+            stopAllGrids();
+
+            snpParts.addAll(findSnapshotParts(FIRST_CLUSTER_PREFIX, SNAPSHOT_NAME));
+
+            inited = true;
+        }
+
+        beforeTestSnapshot();
+        cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+    }
+
+    /** @throws Exception If fails. */
+    @After
+    public void afterSwitchSnapshot() throws Exception {
+        afterTestSnapshot();
+        cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+    }
+
+    /** */
+    @AfterClass
+    public static void cleanupSnapshot() {
+        snpParts.forEach(U::delete);
+        cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreAllGroups() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).record(SnapshotFilesRequestMessage.class);
+
+        // Restore all cache groups.
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+        awaitPartitionMapExchange(true, true, null, true);
+
+        assertCacheKeys(scc.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        assertCacheKeys(scc.cache(CACHE1), CACHE_KEYS_RANGE);
+        assertCacheKeys(scc.cache(CACHE2), CACHE_KEYS_RANGE);
+
+        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+        List<Object> msgs = new ArrayList<>();
+
+        for (Ignite g : G.allGrids())
+            msgs.addAll(TestRecordingCommunicationSpi.spi(g).recordedMessages(true));
+
+        assertPartitionsDuplicates(msgs);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreNoRebalance() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        for (Ignite g : G.allGrids())
+            TestRecordingCommunicationSpi.spi(g).record(GridDhtPartitionDemandMessage.class);
+
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(CACHE_WITH_NODE_FILTER)).get(TIMEOUT);
+
+        awaitPartitionMapExchange(true, true, null, true);
+
+        assertCacheKeys(scc.cache(CACHE_WITH_NODE_FILTER), CACHE_KEYS_RANGE);
+        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+
+        for (Ignite g : G.allGrids())
+            assertTrue(TestRecordingCommunicationSpi.spi(g).recordedMessages(true).isEmpty());
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        IgniteSnapshotManager mgr = snp(grid(1));
+        mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
+            @Override public SnapshotSender apply(String s, UUID uuid) {
+                return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
+                    @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                        if (partId(part.getName()) > 0)
+                            throw new IgniteException("Test exception. Uploading partition file failed: " + pair);
+
+                        super.sendPart0(part, cacheDirName, pair, length);
+                    }
+                };
+            }
+        });
+
+        IgniteFuture<?> fut = grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            () -> fut.get(TIMEOUT),
+            IgniteException.class,
+            "Test exception. Uploading partition file failed");
+        assertNull(scc.cache(DEFAULT_CACHE_NAME));
+        ensureCacheAbsent(dfltCacheCfg);
+    }
+
+    /**
+     * @param snpParts Snapshot parts.
+     * @param toNodes List of toNodes to copy parts to.
+     */
+    private static void copyAndShuffle(Set<Path> snpParts, List<Ignite> toNodes) {
+        AtomicInteger cnt = new AtomicInteger();
+
+        snpParts.forEach(p -> {
+            try {
+                IgniteEx loc = (IgniteEx)toNodes.get(cnt.getAndIncrement() % toNodes.size());
+                String snpName = p.getFileName().toString();
+
+                U.copy(p.toFile(),
+                    Paths.get(resolveSnapshotWorkDirectory(loc.configuration()).getAbsolutePath(), snpName).toFile(),
+                    false);
+            }
+            catch (IOException e) {
+                throw new IgniteException(e);
+            }
+        });
+    }
+
+    /**
+     * @param clusterPrefix Array of prefixes to clean up directories.
+     */
+    private static void cleanupDedicatedPersistenceDirs(String... clusterPrefix) {
+        for (String prefix : clusterPrefix) {
+            try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
+                path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
+            ) {
+                for (Path dir : ds)
+                    U.delete(dir);
+            }
+            catch (IOException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+    /**
+     * @return Collection of dedicated snapshot paths located in Ignite working directory.
+     */
+    private static Set<Path> findSnapshotParts(String prefix, String snpName) {
+        Set<Path> snpPaths = new HashSet<>();
+
+        try (DirectoryStream<Path> ds = Files.newDirectoryStream(defaultWorkDirectory(),
+            path -> Files.isDirectory(path) && path.getFileName().toString().toLowerCase().startsWith(prefix))
+        ) {
+            for (Path dir : ds)
+                snpPaths.add(searchDirectoryRecursively(dir, snpName)
+                    .orElseThrow(() -> new IgniteException("Snapshot not found in the Ignite work directory " +
+                        "[dir=" + dir.toString() + ", snpName=" + snpName + ']')));
+
+            return snpPaths;
+        }
+        catch (IOException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param grids Number of ignite instances to start.
+     * @param keys Number of keys to create.
+     * @param valMapper Factory which produces values.
+     * @param <V> Cache value type.
+     * @return Ignite coordinator instance.
+     * @throws Exception If fails.
+     */
+    private <V> IgniteEx startDedicatedGridsWithCache(
+        String prefix,
+        int grids,
+        int keys,
+        Function<Integer, V> valMapper,
+        CacheConfiguration<Integer, V>... ccfgs
+    ) throws Exception {
+        return startGridsWithCache(grids,
+            keys,
+            valMapper,
+            CLUSTER_DIR.apply(prefix),
+            ccfgs);
+    }
+
+    /**
+     * @param grids Number of ignite instances to start.
+     * @return Ignite coordinator instance.
+     * @throws Exception If fails.
+     */
+    private IgniteEx startDedicatedGrids(String prefix, int grids) throws Exception {
+        for (int g = 0; g < grids; g++)
+            startDedicatedGrid(prefix, g);
+
+        grid(0).events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT);
+
+        return grid(0);
+    }
+
+    /**
+     * @param prefix Grid work directory prefix.
+     * @param id Grid index.
+     * @return Grid instance.
+     * @throws Exception If fails.
+     */
+    private IgniteEx startDedicatedGrid(String prefix, int id) throws Exception {
+        IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(id)));
+        cfg.setWorkDirectory(CLUSTER_DIR.apply(prefix).apply(id, cfg));
+
+        return startGrid(cfg);
+    }
+
+    /**
+     * @return Default work directory.
+     */
+    private static Path defaultWorkDirectory() {
+        try {
+            return Paths.get(U.defaultWorkDirectory());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private static void assertPartitionsDuplicates(List<Object> msgs) {
+        List<GroupPartitionId> all = new ArrayList<>();
+
+        for (Object o : msgs) {
+            SnapshotFilesRequestMessage msg0 = (SnapshotFilesRequestMessage)o;
+            Map<Integer, Set<Integer>> parts = msg0.parts();
+
+            for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
+                for (Integer partId : e.getValue())
+                    all.add(new GroupPartitionId(e.getKey(), partId));
+            }
+        }
+
+        assertEquals(all.size(), new HashSet<>(all).size());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
index 402f2f7..12dc32d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicWithPersistenceTestSuite.java
@@ -47,6 +47,8 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCl
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
 import org.apache.ignite.internal.processors.performancestatistics.CacheStartTest;
 import org.apache.ignite.internal.processors.performancestatistics.CheckpointTest;
@@ -101,12 +103,14 @@ import org.junit.runners.Suite;
 
     IgniteSnapshotManagerSelfTest.class,
     IgniteClusterSnapshotSelfTest.class,
+    IgniteSnapshotRemoteRequestTest.class,
     IgniteClusterSnapshotCheckTest.class,
     IgniteSnapshotWithMetastorageTest.class,
     IgniteSnapshotMXBeanTest.class,
     IgniteClusterSnapshotRestoreSelfTest.class,
     IgniteClusterSnapshotHandlerTest.class,
     EncryptedSnapshotTest.class,
+    IgniteSnapshotRestoreFromRemoteTest.class,
 
     IgniteClusterIdTagTest.class,
     FullyConnectedComponentSearcherTest.class,
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
index 11f5f8c..7db3ff6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
@@ -35,11 +35,14 @@ import org.apache.ignite.events.SnapshotEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static java.util.Optional.ofNullable;
 import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
 import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
 import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
@@ -54,9 +57,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
     /** Number of cache keys to pre-create at node start. */
     private static final int CACHE_KEYS_RANGE = 10_000;
 
-    /** Cache value builder. */
-    private Function<Integer, Object> valBuilder = new BinaryValueBuilder(TYPE_NAME);
-
     /** {@inheritDoc} */
     @Override protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
         return super.txCacheConfig(ccfg).setSqlIndexMaxInlineSize(255).setSqlSchema("PUBLIC")
@@ -67,11 +67,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
                 .setIndexes(Collections.singletonList(new QueryIndex("id")))));
     }
 
-    /** {@inheritDoc} */
-    @Override protected Function<Integer, Object> valueBuilder() {
-        return valBuilder;
-    }
-
     /** @throws Exception If failed. */
     @Test
     public void testBasicClusterSnapshotRestore() throws Exception {
@@ -82,6 +77,7 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
 
         assertCacheKeys(client.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        assertRebuildIndexes(client.cache(DEFAULT_CACHE_NAME), false);
 
         waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
     }
@@ -89,6 +85,8 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
     /** @throws Exception If failed. */
     @Test
     public void testBasicClusterSnapshotRestoreWithMetadata() throws Exception {
+        valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
         IgniteEx ignite = startGridsWithSnapshot(2, CACHE_KEYS_RANGE);
 
         // Remove metadata.
@@ -101,6 +99,7 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
         ignite.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME)).get(TIMEOUT);
 
         assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+        assertRebuildIndexes(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), false);
 
         for (Ignite grid : G.allGrids())
             assertNotNull(((IgniteEx)grid).context().cacheObjects().metadata(typeId));
@@ -111,9 +110,11 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
     /** @throws Exception If failed. */
     @Test
     public void testClusterSnapshotRestoreOnBiggerTopology() throws Exception {
+        valBuilder = new BinaryValueBuilder(TYPE_NAME);
+
         int nodesCnt = 4;
 
-        startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valBuilder, dfltCacheCfg);
+        startGridsWithCache(nodesCnt - 2, CACHE_KEYS_RANGE, valueBuilder(), dfltCacheCfg);
 
         grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
 
@@ -146,7 +147,11 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
 
         awaitPartitionMapExchange();
 
-        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+        for (Ignite g : G.allGrids())
+            ofNullable(indexRebuildFuture((IgniteEx)g, CU.cacheId(DEFAULT_CACHE_NAME))).orElse(new GridFinishedFuture<>()).get(TIMEOUT);
+
+        for (Ignite g : G.allGrids())
+            assertCacheKeys(g.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
 
         GridTestUtils.waitForCondition(() -> evts.size() == 2, TIMEOUT);
         assertEquals(2, evts.size());
@@ -168,12 +173,6 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
 
             String nodeId = ctx.localNodeId().toString();
 
-            assertTrue("nodeId=" + nodeId, grid.cache(cache.getName()).indexReadyFuture().isDone());
-
-            // Make sure no index rebuild happened.
-            assertEquals("nodeId=" + nodeId,
-                0, ctx.cache().cache(cache.getName()).context().cache().metrics0().getIndexRebuildKeysProcessed());
-
             GridQueryProcessor qry = ((IgniteEx)grid).context().query();
 
             // Make sure  SQL works fine.
@@ -188,6 +187,23 @@ public class IgniteClusterSnapshotRestoreWithIndexingTest extends IgniteClusterS
         }
     }
 
+    /**
+     * @param cache Ignite cache.
+     * @param rebuild Rebuild index happened.
+     */
+    private void assertRebuildIndexes(IgniteCache<Object, Object> cache, boolean rebuild) {
+        for (Ignite grid : G.allGrids()) {
+            GridKernalContext ctx = ((IgniteEx)grid).context();
+
+            assertTrue("nodeId=" + ctx.localNodeId(), grid.cache(cache.getName()).indexReadyFuture().isDone());
+
+            // Make sure no index rebuild happened.
+            assertEquals("nodeId=" + ctx.localNodeId(),
+                rebuild, ctx.cache().cache(cache.getName()).context().cache().metrics0()
+                    .getIndexRebuildKeysProcessed() > 0);
+        }
+    }
+
     /** */
     private static class IndexedValueBuilder implements Function<Integer, Object> {
         /** {@inheritDoc} */