You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/08/11 21:23:50 UTC

[GitHub] [ignite] Mmuzaf opened a new pull request #9321: IGNITE-14744 Partition swap on checkpoint

Mmuzaf opened a new pull request #9321:
URL: https://github.com/apache/ignite/pull/9321


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9321: IGNITE-14744 Partition swap on checkpoint

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9321:
URL: https://github.com/apache/ignite/pull/9321#discussion_r728130429



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -316,6 +371,9 @@
     /** Snapshot operation handlers. */
     private final SnapshotHandlers handlers = new SnapshotHandlers();
 
+    /** Manager to handle remote snapshot requests and receive. */

Review comment:
       receive response?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -987,14 +1044,7 @@ public void readCacheConfigurations(File dir, Map<String, StoredCacheData> ccfgs
             if (conf.exists() && conf.length() > 0) {
                 StoredCacheData cacheData = readCacheData(conf);
 
-                String cacheName = cacheData.config().getName();
-
-                if (!ccfgs.containsKey(cacheName))
-                    ccfgs.put(cacheName, cacheData);
-                else {
-                    U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file "
-                        + dir.getName());
-                }
+                ccfgs.putIfAbsent(cacheData.config().getName(), readCacheData(conf));

Review comment:
       Why do we call readCacheData again? We already have cacheData.
   

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
      * @param res Results.
      * @param errs Errors.
      */
-    private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+    private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
         if (ctx.clientNode())
             return;
 
-        if (!errs.isEmpty()) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        opCtx0.errHnd.accept(failure);
+
+        if (failure != null) {
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+            if (U.isLocalNodeCoordinator(ctx.discovery()))
+                rollbackRestoreProc.start(reqId, reqId);
+
+            return;
+        }
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            cacheStartProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Throwable err = opCtx0.err.get();
+
+        if (err != null)
+            return new GridFinishedFuture<>(err);
+
+        if (!U.isLocalNodeCoordinator(ctx.discovery()))
+            return opCtx0.cachesLoadFut;
+
+        Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting restored caches " +
+                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+                ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+        }
+
+        // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
+        // the cluster during the cache startup, the whole procedure will be rolled back.
+        GridCompoundFuture<Boolean, Boolean> awaitBoth = new GridCompoundFuture<>();
+
+        IgniteInternalFuture<Boolean> cacheStartFut = ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+            true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+        // This is required for the rollback procedure to execute the cache groups stop operation.
+        cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error() == null));
+
+        // Convert exception to the RestoreCacheStartException to propagate to other nodes over the distributed process.
+        awaitBoth.add(chainCacheStartException(cacheStartFut));
+        awaitBoth.add(opCtx0.cachesLoadFut);
+
+        awaitBoth.markInitialized();
+
+        return awaitBoth;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut, opCtx0);
+
+        if (F.isEmpty(grpIdsToStart))
+            return;
+
+        assert opCtx0 != null;
+        assert !opCtx0.sameTop : "WAL must be disabled only for caches restoring from snapshot taken on another cluster: " + opCtx0;
+
+        // This is happened on the exchange which has been initiated by a dynamic cache start message and intend to switch
+        // off the WAL for cache groups loading from a snapshot.
+        for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g -> grpIdsToStart.contains(g.groupId()))) {
+            assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+            // Check partitions have not even been created yet, so the PartitionMetaStateRecord won't be logged to the WAL.
+            for (int p = 0; p < grp.topology().partitions(); p++)
+                assert grp.topology().localPartition(p) == null : p;
+
+            grp.localWalEnabled(false, true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        // This will be called after the processCacheStopRequestOnExchangeDone happens.
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (ctx.clientNode() || fut == null || fut.exchangeActions() == null || opCtx0 == null)
+            return;
+
+        Set<String> grpNamesToStop = fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+                requirePartitionLoad(ccfg, uuid, opCtx0))
+            .stream()
+            .map(g -> g.descriptor().cacheOrGroupName())
+            .collect(Collectors.toSet());
+
+        if (F.isEmpty(grpNamesToStop))
+            return;
+
+        assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+        opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        if (failure == null) {
+            if (opCtx0.sameTop) {
+                finishProcess(reqId);
+
+                updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+            }
+            else
+                lateAffProc.start(reqId, reqId);
+
+            return;
+        }
+
+        opCtx0.errHnd.accept(failure);
+
+        ClusterNode crd = U.oldest(ctx.discovery().aliveServerNodes(), null);
+
+        // Caches were not even been started and rollback already occurred during PME, so they are not even stared.
+        if (X.hasCause(errs.get(crd.id()), RestoreCacheStartException.class))
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            rollbackRestoreProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param grps Ordered list of cache groups sorted by priority.
+     * @param exchFut Exchange future.
+     */
+    public void onRebalanceReady(Set<CacheGroupContext> grps, @Nullable GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Set<Integer> exchGrpIds = getCachesLoadingFromSnapshot(exchFut, opCtx0);
+
+        Set<CacheGroupContext> filtered = grps.stream()
+            .filter(Objects::nonNull)
+            .filter(g -> exchGrpIds.contains(g.groupId()))
+            .collect(Collectors.toSet());
+
+        // Restore requests has been already processed at previous exchange.
+        if (filtered.isEmpty())
+            return;
+
+        assert opCtx0 != null;
+
+        // First preload everything from the local node.
+        List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+        Map<Integer, Set<Integer>> notScheduled = new HashMap<>();
+
+        // Register partitions to be processed.
+        for (CacheGroupContext grp : filtered) {
+            if (F.isEmpty(locMetas))
+                break;
+
+            notScheduled.put(grp.groupId(),
+                affinityPartitions(grp.affinity(), ctx.cache().context().localNode(), Integer::new));
+
+            Set<Integer> leftParts = notScheduled.get(grp.groupId());
+
+            assert !leftParts.contains(INDEX_PARTITION);
+
+            if (F.isEmpty(leftParts)) {
+                opCtx0.cachesLoadFut.onDone();
+
+                continue;
+            }
+
+            Set<PartitionRestoreLifecycleFuture> partLfs = U.newHashSet(leftParts.size());
+
+            for (Integer partId : leftParts) {
+                // Affinity node partitions are inited on exchange.
+                GridDhtLocalPartition part = grp.topology().localPartition(partId);
+                PartitionRestoreLifecycleFuture lf = PartitionRestoreLifecycleFuture.create(grp, log, partId);
+
+                // Start partition eviction first.
+                if (part == null)
+                    lf.cleared.complete(null);
+                else {
+                    part.clearAsync().listen(f -> {
+                        // This future must clear all heap cache entries too from the GridDhtLocalPartition map.
+                        if (f.error() == null)
+                            lf.cleared.complete(f.result());

Review comment:
       I don't like CompletableFuture overusing especially in simple cases like this (but also it brings nothing then complexity in whole patch). Ignite future will be simple chained in this case, but for a CompletableFuture you should write a lot of code which is much harder to read.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFutureTask extends GridFutureAdapter<Void> implements Runnable {

Review comment:
       It's not a good class name, we already have AbstractSnapshotFutureTask and SnapshotFutureTask, but RemoteSnapshotFutureTask is not related to these classes anyhow.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1576,39 +1714,40 @@ SnapshotFutureTask registerSnapshotTask(
         boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
+        return (SnapshotFutureTask)registerSnapshotTask(snpName, new SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,
+            ioFactory, snpSndr, parts, withMetaStorage, locBuff));
+    }
+
+    /**
+     * @param task Snapshot operation task to be executed.
+     * @return Snapshot operation task which should be registered on checkpoint to run.
+     */
+    private AbstractSnapshotFutureTask<?> registerSnapshotTask(String rqId, AbstractSnapshotFutureTask<?> task) {
         if (!busyLock.enterBusy()) {
-            return new SnapshotFutureTask(
-                new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
+            return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+                cctx.localNodeId() + ']'));
         }
 
         try {
-            if (locSnpTasks.containsKey(snpName))
-                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
+            if (locSnpTasks.containsKey(rqId))
+                return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +

Review comment:
       {}

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -280,6 +332,9 @@
     /** Local snapshot sender factory. */
     private Function<String, SnapshotSender> locSndrFactory = LocalSnapshotSender::new;
 
+    /** Local snapshot sender factory. */

Review comment:
       Local -> Remote?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1629,6 +1768,26 @@ void localSnapshotSenderFactory(Function<String, SnapshotSender> factory) {
         return locSndrFactory;
     }
 
+    /**
+     * @param factory Factory which produces {@link RemoteSnapshotSender} implementation.
+     */
+    void remoteSnapshotSenderFactory(BiFunction<String, UUID, SnapshotSender> factory) {
+        rmtSndrFactory = factory;
+    }
+
+    /**
+     * @param rqId Request id.
+     * @param nodeId Node id.
+     * @return Snapshot sender related to given node id.
+     */
+    RemoteSnapshotSender remoteSnapshotSenderFactory(String rqId, UUID nodeId) {
+        return new RemoteSnapshotSender(log,
+            snpRunner,
+            () -> databaseRelativePath(pdsSettings.folderName()),

Review comment:
       Why can't we use the path directly? Without supplier.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFutureTask extends GridFutureAdapter<Void> implements Runnable {
+        /** Snapshot name to create. */
+        private final String rqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Initialization procedure. */
+        private final IgniteThrowableConsumer<String> init;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param rmtNodeId Remote node id to request snapshots.
+         * @param tmpWorkDir Temporary work directory.
+         * @param init The initialization task procedure.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFutureTask(
+            UUID rmtNodeId,
+            File tmpWorkDir,
+            IgniteThrowableConsumer<String> init,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(tmpWorkDir.getAbsolutePath(), rqId);
+            this.rmtNodeId = rmtNodeId;
+            this.init = init;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                init.accept(rqId);
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFutureTask future = (RemoteSnapshotFutureTask)o;
+
+            return Objects.equals(rqId, future.rqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(rqId);

Review comment:
       `rqId.hash()` (don't create array and iterator implicitly)

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFutureTask extends GridFutureAdapter<Void> implements Runnable {
+        /** Snapshot name to create. */
+        private final String rqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Initialization procedure. */
+        private final IgniteThrowableConsumer<String> init;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param rmtNodeId Remote node id to request snapshots.
+         * @param tmpWorkDir Temporary work directory.
+         * @param init The initialization task procedure.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFutureTask(
+            UUID rmtNodeId,
+            File tmpWorkDir,
+            IgniteThrowableConsumer<String> init,

Review comment:
       It's better to include `init` code in `run` and pass plain parameters here

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -333,6 +391,10 @@ public IgniteSnapshotManager(GridKernalContext ctx) {
         marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
 
         restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
+        ctx.internalSubscriptionProcessor().registerMetastorageListener(restoreCacheGrpProc);

Review comment:
       NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -316,6 +371,9 @@
     /** Snapshot operation handlers. */
     private final SnapshotHandlers handlers = new SnapshotHandlers();
 
+    /** Manager to handle remote snapshot requests and receive. */
+    private volatile SequentialRemoteSnapshotManager snpRmtHandler;

Review comment:
       final

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFutureTask extends GridFutureAdapter<Void> implements Runnable {
+        /** Snapshot name to create. */
+        private final String rqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Initialization procedure. */
+        private final IgniteThrowableConsumer<String> init;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param rmtNodeId Remote node id to request snapshots.
+         * @param tmpWorkDir Temporary work directory.
+         * @param init The initialization task procedure.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFutureTask(
+            UUID rmtNodeId,
+            File tmpWorkDir,
+            IgniteThrowableConsumer<String> init,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(tmpWorkDir.getAbsolutePath(), rqId);
+            this.rmtNodeId = rmtNodeId;
+            this.init = init;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                init.accept(rqId);
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFutureTask future = (RemoteSnapshotFutureTask)o;
+
+            return Objects.equals(rqId, future.rqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(rqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFutureTask.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private final AtomicReference<RemoteSnapshotFutureTask> active = new AtomicReference<>();
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFutureTask> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param task The supplier of a new task. May produce <tt>null</tt> value if the queue is empty.
+         */
+        public void submit(Supplier<RemoteSnapshotFutureTask> task) {
+            RemoteSnapshotFutureTask curr = active.get();
+            RemoteSnapshotFutureTask next = task.get();
+
+            if ((curr == null || curr.isDone()) && active.compareAndSet(curr, next)) {
+                if (next == null)
+                    return;
+
+                next.listen(f -> submit(queue::poll));
+
+                if (stopping)
+                    next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.run();
+            }
+            else if (next != null)
+                queue.offer(next);

Review comment:
       There is a race possible here, when {{next}} task will never be executed 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -276,21 +480,12 @@ protected void cleanup() throws IgniteCheckedException {
 
                 return;
             }
-
-            Collection<String> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
-                node -> node.consistentId().toString(), (node) -> CU.baselineNode(node, ctx.state().clusterState()));
-
-            snpBltNodes.removeAll(bltNodes);
-
-            if (!snpBltNodes.isEmpty()) {
-                finishProcess(fut0.rqId, new IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
-                    "restore a cache group are missing [nodeId(s)=" + snpBltNodes + ", snapshot=" + snpName + ']'));
-
-                return;
-            }
+            Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),

Review comment:
       `Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());`
   ?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -781,162 +918,181 @@ private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs)
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+            }
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
+        opCtx0.sameTop = sameTopology(opCtx0.nodes, opCtx0.metasPerNode);
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
-            cacheStartProc.start(reqId, reqId);
+            preloadProc.start(reqId, reqId);
     }
 
     /**
-     * @param reqId Request ID.
-     * @return Result future.
+     * @param nodes Nodes that have to alive to complete restore operation.
+     * @return {@code true} if the snapshot and current cluster topologies are compatible.
      */
-    private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
-        if (ctx.clientNode())
-            return new GridFinishedFuture<>();
+    private boolean sameTopology(Set<UUID> nodes, Map<UUID, ArrayList<SnapshotMetadata>> metas) {
+        Set<String> clusterBlts = nodes.stream()
+            .map(n -> ctx.discovery().node(n).consistentId().toString())
+            .collect(Collectors.toSet());
 
-        SnapshotRestoreContext opCtx0 = opCtx;
+        // Snapshot baseline nodes.
+        List<SnapshotMetadata> nodeMetas = F.first(metas.values());
 
-        Throwable err = opCtx0.err.get();
+        if (nodeMetas == null)
+            return false;
 
-        if (err != null)
-            return new GridFinishedFuture<>(err);
+        Set<String> snpBlts = F.first(nodeMetas).baselineNodes();
 
-        if (!U.isLocalNodeCoordinator(ctx.discovery()))
-            return new GridFinishedFuture<>();
+        if (!clusterBlts.containsAll(snpBlts))
+            return false;
 
-        Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+        // Each node must have its own local copy of a snapshot.
+        for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e : metas.entrySet()) {
+            String consId = ctx.discovery().node(e.getKey()).consistentId().toString();
 
-        if (log.isInfoEnabled()) {
-            log.info("Starting restored caches " +
-                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
-                ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+            // Local node metadata is always on the first place of a list.
+            SnapshotMetadata meta = F.first(e.getValue());
+
+            if (meta == null || !meta.consistentId().equals(consId))
+                return false;
         }
 
-        // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
-        // the cluster during the cache startup, the whole procedure will be rolled back.
-        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
+        return true;
     }
 
     /**
-     * @param reqId Request ID.
-     * @param res Results.
-     * @param errs Errors.
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
      */
-    private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
         if (ctx.clientNode())
-            return;
+            return new GridFinishedFuture<>();
 
         SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
 
-        Exception failure = errs.values().stream().findFirst().
-            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
 
-        if (failure == null) {
-            finishProcess(reqId);
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
 
-            return;
-        }
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
 
-        opCtx0.err.compareAndSet(null, failure);
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+            }
 
-        if (U.isLocalNodeCoordinator(ctx.discovery()))
-            rollbackRestoreProc.start(reqId, reqId);
-    }
+            // Guard all snapshot restore operations with the metastorage keys.
+            updateMetastorageRecoveryKeys(opCtx0.dirs, false);
 
-    /**
-     * @param reqNodes Set of required topology nodes.
-     * @param respNodes Set of responding topology nodes.
-     * @return Error, if no response was received from the required topology node.
-     */
-    private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
-        if (!respNodes.containsAll(reqNodes)) {
-            Set<UUID> leftNodes = new HashSet<>(reqNodes);
+            CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
 
-            leftNodes.removeAll(respNodes);
+                            File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
 
-            return new ClusterTopologyCheckedException(OP_REJECT_MSG +
-                "Required node has left the cluster [nodeId=" + leftNodes + ']');
-        }
+                            ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update operation for the cache groups restore process", t);
 
-        return null;
-    }
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
 
-    /**
-     * @param reqId Request ID.
-     * @return Result future.
-     */
-    private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
-        if (ctx.clientNode())
-            return new GridFinishedFuture<>();
+            CompletableFuture<Void> partFut = CompletableFuture.completedFuture(null);
 
-        SnapshotRestoreContext opCtx0 = opCtx;
+            if (opCtx0.sameTop) {
+                if (log.isInfoEnabled()) {
+                    log.info("The snapshot was taken on the same cluster topology. It may by copied prior to starting cache groups " +
+                        "[snpName=" + opCtx0.snpName +
+                        ", dirs=" + opCtx0.dirs.stream().map(File::getName).collect(Collectors.toList()) + ']');
+                }
 
-        if (opCtx0 == null || F.isEmpty(opCtx0.dirs))
-            return new GridFinishedFuture<>();
+                List<CompletableFuture<Path>> futs = new ArrayList<>();
+                String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
 
-        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+                for (File cacheDir : opCtx0.dirs) {
+                    File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx0.snpName),
+                        Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
 
-        synchronized (this) {
-            opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+                    if (!snpCacheDir.exists())
+                        throw new IgniteCheckedException("Snapshot directory doesn't exist: " + snpCacheDir);
 
-            try {
-                ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
-                    if (log.isInfoEnabled()) {
-                        log.info("Removing restored cache directories [reqId=" + reqId +
-                            ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
-                    }
+                    File tmpCacheDir = formatTmpDirName(cacheDir);
 
-                    IgniteCheckedException ex = null;
+                    tmpCacheDir.mkdir();
 
-                    for (File cacheDir : opCtx0.dirs) {
-                        File tmpCacheDir = formatTmpDirName(cacheDir);
+                    for (File snpFile : snpCacheDir.listFiles()) {
+                        CompletableFuture<Path> fut;
 
-                        if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+                        copyFileLocal(snpMgr, opCtx0, snpFile,
+                            Paths.get(tmpCacheDir.getAbsolutePath(), snpFile.getName()),
+                            fut = new CompletableFuture<>());
 
-                            ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
-                        }
+                        futs.add(fut);
+                    }
+                }
 
-                        if (cacheDir.exists() && !U.delete(cacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
+                int size = futs.size();
 
-                            ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
+                partFut = CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
+                    .runAfterBothAsync(metaFut, () -> {
+                        try {
+                            if (opCtx0.stopChecker.getAsBoolean())
+                                throw new IgniteInterruptedException("The operation has been stopped on temporary directory switch.");
+
+                            for (File src : opCtx0.dirs)
+                                Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
                         }
-                    }
+                        catch (Throwable e) {
+                            opCtx0.errHnd.accept(e);
+                        }
+                    }, snpMgr.snapshotExecutorService())
+                    // Complete the local rebalance cache future, since the data is loaded.
+                    .thenAccept(r -> opCtx0.cachesLoadFut.onDone(true));
+            }
 
-                    if (ex != null)
-                        retFut.onDone(ex);
-                    else
+            allOfFailFast(Arrays.asList(metaFut, partFut))

Review comment:
       partFut already contains metaFut (since it created as runAfterBothAsync)

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -967,9 +1723,231 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
                 " operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']');
         }
 
+        updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+
         finishProcess(reqId, opCtx0.err.get());
     }
 
+    /**
+     * @param mgr Ignite snapshot manager.
+     * @param opCtx Snapshot operation context.
+     * @param snpFile Snapshot file to copy.
+     * @param target Destination path.
+     * @param fut Future which will handle the copy results.
+     */
+    private static void copyFileLocal(
+        IgniteSnapshotManager mgr,
+        SnapshotRestoreContext opCtx,
+        File snpFile,
+        Path target,
+        CompletableFuture<Path> fut

Review comment:
       Let's just return future here and don't do any additional nested-future logic (do it whenever it's needed)

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -781,162 +918,181 @@ private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs)
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+            }
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
+        opCtx0.sameTop = sameTopology(opCtx0.nodes, opCtx0.metasPerNode);
 
         if (U.isLocalNodeCoordinator(ctx.discovery()))
-            cacheStartProc.start(reqId, reqId);
+            preloadProc.start(reqId, reqId);
     }
 
     /**
-     * @param reqId Request ID.
-     * @return Result future.
+     * @param nodes Nodes that have to alive to complete restore operation.
+     * @return {@code true} if the snapshot and current cluster topologies are compatible.
      */
-    private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
-        if (ctx.clientNode())
-            return new GridFinishedFuture<>();
+    private boolean sameTopology(Set<UUID> nodes, Map<UUID, ArrayList<SnapshotMetadata>> metas) {
+        Set<String> clusterBlts = nodes.stream()
+            .map(n -> ctx.discovery().node(n).consistentId().toString())
+            .collect(Collectors.toSet());
 
-        SnapshotRestoreContext opCtx0 = opCtx;
+        // Snapshot baseline nodes.
+        List<SnapshotMetadata> nodeMetas = F.first(metas.values());
 
-        Throwable err = opCtx0.err.get();
+        if (nodeMetas == null)
+            return false;
 
-        if (err != null)
-            return new GridFinishedFuture<>(err);
+        Set<String> snpBlts = F.first(nodeMetas).baselineNodes();
 
-        if (!U.isLocalNodeCoordinator(ctx.discovery()))
-            return new GridFinishedFuture<>();
+        if (!clusterBlts.containsAll(snpBlts))
+            return false;
 
-        Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+        // Each node must have its own local copy of a snapshot.
+        for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e : metas.entrySet()) {
+            String consId = ctx.discovery().node(e.getKey()).consistentId().toString();
 
-        if (log.isInfoEnabled()) {
-            log.info("Starting restored caches " +
-                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
-                ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+            // Local node metadata is always on the first place of a list.
+            SnapshotMetadata meta = F.first(e.getValue());
+
+            if (meta == null || !meta.consistentId().equals(consId))
+                return false;
         }
 
-        // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
-        // the cluster during the cache startup, the whole procedure will be rolled back.
-        return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true, false, IgniteUuid.fromUuid(reqId));
+        return true;
     }
 
     /**
-     * @param reqId Request ID.
-     * @param res Results.
-     * @param errs Errors.
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
      */
-    private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
         if (ctx.clientNode())
-            return;
+            return new GridFinishedFuture<>();
 
         SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
 
-        Exception failure = errs.values().stream().findFirst().
-            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
 
-        if (failure == null) {
-            finishProcess(reqId);
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
 
-            return;
-        }
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
 
-        opCtx0.err.compareAndSet(null, failure);
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+            }
 
-        if (U.isLocalNodeCoordinator(ctx.discovery()))
-            rollbackRestoreProc.start(reqId, reqId);
-    }
+            // Guard all snapshot restore operations with the metastorage keys.
+            updateMetastorageRecoveryKeys(opCtx0.dirs, false);
 
-    /**
-     * @param reqNodes Set of required topology nodes.
-     * @param respNodes Set of responding topology nodes.
-     * @return Error, if no response was received from the required topology node.
-     */
-    private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
-        if (!respNodes.containsAll(reqNodes)) {
-            Set<UUID> leftNodes = new HashSet<>(reqNodes);
+            CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
 
-            leftNodes.removeAll(respNodes);
+                            File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
 
-            return new ClusterTopologyCheckedException(OP_REJECT_MSG +
-                "Required node has left the cluster [nodeId=" + leftNodes + ']');
-        }
+                            ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update operation for the cache groups restore process", t);
 
-        return null;
-    }
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
 
-    /**
-     * @param reqId Request ID.
-     * @return Result future.
-     */
-    private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
-        if (ctx.clientNode())
-            return new GridFinishedFuture<>();
+            CompletableFuture<Void> partFut = CompletableFuture.completedFuture(null);
 
-        SnapshotRestoreContext opCtx0 = opCtx;
+            if (opCtx0.sameTop) {
+                if (log.isInfoEnabled()) {
+                    log.info("The snapshot was taken on the same cluster topology. It may by copied prior to starting cache groups " +
+                        "[snpName=" + opCtx0.snpName +
+                        ", dirs=" + opCtx0.dirs.stream().map(File::getName).collect(Collectors.toList()) + ']');
+                }
 
-        if (opCtx0 == null || F.isEmpty(opCtx0.dirs))
-            return new GridFinishedFuture<>();
+                List<CompletableFuture<Path>> futs = new ArrayList<>();
+                String pdsFolderName = ctx.pdsFolderResolver().resolveFolders().folderName();
 
-        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+                for (File cacheDir : opCtx0.dirs) {
+                    File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx0.snpName),
+                        Paths.get(databaseRelativePath(pdsFolderName), cacheDir.getName()).toString());
 
-        synchronized (this) {
-            opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+                    if (!snpCacheDir.exists())
+                        throw new IgniteCheckedException("Snapshot directory doesn't exist: " + snpCacheDir);
 
-            try {
-                ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
-                    if (log.isInfoEnabled()) {
-                        log.info("Removing restored cache directories [reqId=" + reqId +
-                            ", snapshot=" + opCtx0.snpName + ", dirs=" + opCtx0.dirs + ']');
-                    }
+                    File tmpCacheDir = formatTmpDirName(cacheDir);
 
-                    IgniteCheckedException ex = null;
+                    tmpCacheDir.mkdir();
 
-                    for (File cacheDir : opCtx0.dirs) {
-                        File tmpCacheDir = formatTmpDirName(cacheDir);
+                    for (File snpFile : snpCacheDir.listFiles()) {
+                        CompletableFuture<Path> fut;
 
-                        if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove temp directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+                        copyFileLocal(snpMgr, opCtx0, snpFile,
+                            Paths.get(tmpCacheDir.getAbsolutePath(), snpFile.getName()),
+                            fut = new CompletableFuture<>());
 
-                            ex = new IgniteCheckedException("Unable to remove temporary cache directory " + cacheDir);
-                        }
+                        futs.add(fut);
+                    }
+                }
 
-                        if (cacheDir.exists() && !U.delete(cacheDir)) {
-                            log.error("Unable to perform rollback routine completely, cannot remove cache directory " +
-                                "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", dir=" + cacheDir + ']');
+                int size = futs.size();
 
-                            ex = new IgniteCheckedException("Unable to remove cache directory " + cacheDir);
+                partFut = CompletableFuture.allOf(futs.toArray(new CompletableFuture[size]))
+                    .runAfterBothAsync(metaFut, () -> {
+                        try {
+                            if (opCtx0.stopChecker.getAsBoolean())
+                                throw new IgniteInterruptedException("The operation has been stopped on temporary directory switch.");
+
+                            for (File src : opCtx0.dirs)
+                                Files.move(formatTmpDirName(src).toPath(), src.toPath(), StandardCopyOption.ATOMIC_MOVE);
                         }
-                    }
+                        catch (Throwable e) {
+                            opCtx0.errHnd.accept(e);
+                        }
+                    }, snpMgr.snapshotExecutorService())
+                    // Complete the local rebalance cache future, since the data is loaded.
+                    .thenAccept(r -> opCtx0.cachesLoadFut.onDone(true));
+            }
 
-                    if (ex != null)
-                        retFut.onDone(ex);
-                    else
+            allOfFailFast(Arrays.asList(metaFut, partFut))
+                .whenComplete((res, t) -> {
+                    Throwable t0 = ofNullable(opCtx0.err.get()).orElse(t);
+
+                    if (t0 == null) {

Review comment:
       redundant {}

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
      * @param res Results.
      * @param errs Errors.
      */
-    private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+    private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
         if (ctx.clientNode())
             return;
 
-        if (!errs.isEmpty()) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        opCtx0.errHnd.accept(failure);
+
+        if (failure != null) {
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+            if (U.isLocalNodeCoordinator(ctx.discovery()))
+                rollbackRestoreProc.start(reqId, reqId);
+
+            return;
+        }
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            cacheStartProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Throwable err = opCtx0.err.get();
+
+        if (err != null)
+            return new GridFinishedFuture<>(err);
+
+        if (!U.isLocalNodeCoordinator(ctx.discovery()))
+            return opCtx0.cachesLoadFut;
+
+        Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting restored caches " +
+                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+                ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+        }
+
+        // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
+        // the cluster during the cache startup, the whole procedure will be rolled back.
+        GridCompoundFuture<Boolean, Boolean> awaitBoth = new GridCompoundFuture<>();
+
+        IgniteInternalFuture<Boolean> cacheStartFut = ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+            true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+        // This is required for the rollback procedure to execute the cache groups stop operation.
+        cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error() == null));
+
+        // Convert exception to the RestoreCacheStartException to propagate to other nodes over the distributed process.
+        awaitBoth.add(chainCacheStartException(cacheStartFut));
+        awaitBoth.add(opCtx0.cachesLoadFut);
+
+        awaitBoth.markInitialized();
+
+        return awaitBoth;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut, opCtx0);
+
+        if (F.isEmpty(grpIdsToStart))
+            return;
+
+        assert opCtx0 != null;
+        assert !opCtx0.sameTop : "WAL must be disabled only for caches restoring from snapshot taken on another cluster: " + opCtx0;
+
+        // This is happened on the exchange which has been initiated by a dynamic cache start message and intend to switch
+        // off the WAL for cache groups loading from a snapshot.
+        for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g -> grpIdsToStart.contains(g.groupId()))) {
+            assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+            // Check partitions have not even been created yet, so the PartitionMetaStateRecord won't be logged to the WAL.
+            for (int p = 0; p < grp.topology().partitions(); p++)
+                assert grp.topology().localPartition(p) == null : p;
+
+            grp.localWalEnabled(false, true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        // This will be called after the processCacheStopRequestOnExchangeDone happens.
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (ctx.clientNode() || fut == null || fut.exchangeActions() == null || opCtx0 == null)
+            return;
+
+        Set<String> grpNamesToStop = fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+                requirePartitionLoad(ccfg, uuid, opCtx0))
+            .stream()
+            .map(g -> g.descriptor().cacheOrGroupName())
+            .collect(Collectors.toSet());
+
+        if (F.isEmpty(grpNamesToStop))
+            return;
+
+        assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+        opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        if (failure == null) {
+            if (opCtx0.sameTop) {
+                finishProcess(reqId);
+
+                updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+            }
+            else
+                lateAffProc.start(reqId, reqId);

Review comment:
       Shouldn't we start process only on coordinator? Why other processes in `finish...` started only on coordinator?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
      * @param res Results.
      * @param errs Errors.
      */
-    private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+    private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
         if (ctx.clientNode())
             return;
 
-        if (!errs.isEmpty()) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        opCtx0.errHnd.accept(failure);
+
+        if (failure != null) {
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+            if (U.isLocalNodeCoordinator(ctx.discovery()))
+                rollbackRestoreProc.start(reqId, reqId);
+
+            return;
+        }
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            cacheStartProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Throwable err = opCtx0.err.get();
+
+        if (err != null)
+            return new GridFinishedFuture<>(err);
+
+        if (!U.isLocalNodeCoordinator(ctx.discovery()))
+            return opCtx0.cachesLoadFut;
+
+        Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting restored caches " +
+                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+                ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+        }
+
+        // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
+        // the cluster during the cache startup, the whole procedure will be rolled back.
+        GridCompoundFuture<Boolean, Boolean> awaitBoth = new GridCompoundFuture<>();
+
+        IgniteInternalFuture<Boolean> cacheStartFut = ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+            true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+        // This is required for the rollback procedure to execute the cache groups stop operation.
+        cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error() == null));
+
+        // Convert exception to the RestoreCacheStartException to propagate to other nodes over the distributed process.
+        awaitBoth.add(chainCacheStartException(cacheStartFut));
+        awaitBoth.add(opCtx0.cachesLoadFut);
+
+        awaitBoth.markInitialized();
+
+        return awaitBoth;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut, opCtx0);
+
+        if (F.isEmpty(grpIdsToStart))
+            return;
+
+        assert opCtx0 != null;
+        assert !opCtx0.sameTop : "WAL must be disabled only for caches restoring from snapshot taken on another cluster: " + opCtx0;
+
+        // This is happened on the exchange which has been initiated by a dynamic cache start message and intend to switch
+        // off the WAL for cache groups loading from a snapshot.
+        for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g -> grpIdsToStart.contains(g.groupId()))) {
+            assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+            // Check partitions have not even been created yet, so the PartitionMetaStateRecord won't be logged to the WAL.
+            for (int p = 0; p < grp.topology().partitions(); p++)
+                assert grp.topology().localPartition(p) == null : p;
+
+            grp.localWalEnabled(false, true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        // This will be called after the processCacheStopRequestOnExchangeDone happens.
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (ctx.clientNode() || fut == null || fut.exchangeActions() == null || opCtx0 == null)
+            return;
+
+        Set<String> grpNamesToStop = fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+                requirePartitionLoad(ccfg, uuid, opCtx0))
+            .stream()
+            .map(g -> g.descriptor().cacheOrGroupName())
+            .collect(Collectors.toSet());
+
+        if (F.isEmpty(grpNamesToStop))
+            return;
+
+        assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+        opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        if (failure == null) {
+            if (opCtx0.sameTop) {
+                finishProcess(reqId);
+
+                updateMetastorageRecoveryKeys(opCtx0.dirs, true);

Review comment:
       Shouldn't we update metastorage first and then complete future?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -967,9 +1723,231 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
                 " operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']');
         }
 
+        updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+
         finishProcess(reqId, opCtx0.err.get());
     }
 
+    /**
+     * @param mgr Ignite snapshot manager.
+     * @param opCtx Snapshot operation context.
+     * @param snpFile Snapshot file to copy.
+     * @param target Destination path.
+     * @param fut Future which will handle the copy results.
+     */
+    private static void copyFileLocal(
+        IgniteSnapshotManager mgr,
+        SnapshotRestoreContext opCtx,
+        File snpFile,
+        Path target,
+        CompletableFuture<Path> fut
+    ) {
+        CompletableFuture.supplyAsync(
+            () -> {
+                if (opCtx.stopChecker.getAsBoolean())
+                    throw new IgniteInterruptedException("The operation has been stopped on copy file: " + snpFile.getAbsolutePath());
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedException("Thread has been interrupted: " + Thread.currentThread().getName());
+
+                if (!snpFile.exists()) {
+                    throw new IgniteException("Partition snapshot file doesn't exist [snpName=" + opCtx.snpName +
+                        ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + snpFile.getName() + ']');
+                }
+
+                IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, target.toFile(), snpFile.length());
+
+                return target;
+            }, mgr.snapshotExecutorService())
+            .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+            .whenComplete((res, t) -> {
+                if (t == null)
+                    fut.complete(res);
+                else
+                    fut.completeExceptionally(t);
+            });
+    }
+
+    /**
+     * @param dirs List of keys to process.
+     * @param remove {@code} if the keys must be removed.
+     */
+    private void updateMetastorageRecoveryKeys(Collection<File> dirs, boolean remove) {
+        for (File dir : dirs) {
+            ctx.cache().context().database().checkpointReadLock();
+
+            String mKey = RESTORE_KEY_PREFIX + dir.getName();
+
+            try {
+                if (remove)
+                    metaStorage.remove(mKey);
+                else
+                    metaStorage.write(mKey, true);
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Updating the metastorage crash-recovery guard key fails [remove=" + remove +
+                    "dir=" + dir.getAbsolutePath() + ']');
+            }
+            finally {
+                ctx.cache().context().database().checkpointReadUnlock();
+            }
+        }
+    }
+
+    /**
+     * @param affCache Affinity cache.
+     * @param node Cluster node to get assigned partitions.
+     * @return The set of partitions assigned to the given node.
+     */
+    private static <T> Set<T> affinityPartitions(
+        GridAffinityAssignmentCache affCache,
+        ClusterNode node,
+        IntFunction<T> factory
+    ) {
+        return IntStream.range(0, affCache.partitions())
+            .filter(p -> affCache.idealAssignment().assignment().get(p).contains(node))
+            .mapToObj(factory)
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * @param action Action to execute.
+     * @param ex Consumer which accepts exceptional execution result.
+     */
+    private static <T> void handleException(Callable<T> action, Consumer<Throwable> ex) {
+        try {
+            action.call();
+        }
+        catch (Throwable t) {
+            ex.accept(t);
+        }
+    }
+
+    /**
+     * @param futs Map of futures to complete.
+     * @param filter Filtering collection.
+     * @param ex Exception.
+     */
+    private static void completeExceptionally(
+        Collection<Set<PartitionRestoreLifecycleFuture>> futs,
+        Map<Integer, Set<Integer>> filter,
+        Throwable ex
+    ) {
+        futs.stream()
+            .flatMap(Collection::stream)
+            .filter(e -> filter.containsKey(e.grp.groupId()))
+            .collect(Collectors.toList())
+            .forEach(f -> f.completeExceptionally(ex));
+    }
+
+    /**
+     * @param cacheStartFut The cache started future to wrap exception if need.
+     * @param <T> Result future type.
+     * @return Future which completes with wrapped exception if it occurred.
+     */
+    private static <T> IgniteInternalFuture<T> chainCacheStartException(IgniteInternalFuture<T> cacheStartFut) {
+        GridFutureAdapter<T> out = new GridFutureAdapter<>();
+
+        cacheStartFut.listen(f -> {
+            if (f.error() == null)
+                out.onDone(f.result());
+            else
+                out.onDone(new RestoreCacheStartException(f.error()));
+        });
+
+        return out;
+    }
+
+    /**
+     * @param lcs Collection of partition context.
+     * @param partId Partition id to find.
+     * @return Load future.
+     */
+    private static @Nullable CompletableFuture<Path> findLoadFuture(Set<PartitionRestoreLifecycleFuture> lcs, int partId) {
+        return ofNullable(F.find(lcs, null, (IgnitePredicate<? super PartitionRestoreLifecycleFuture>)f -> f.partId == partId))
+            .map(c -> c.loaded)
+            .orElse(null);
+    }
+
+    /**
+     * @param fut Current exchange future.
+     * @param ctx Current snapshot restore context.
+     * @return The set of cache groups needs to be processed.
+     */
+    private Set<Integer> getCachesLoadingFromSnapshot(GridDhtPartitionsExchangeFuture fut, SnapshotRestoreContext ctx) {
+        if (fut == null || fut.exchangeActions() == null || ctx == null)
+            return Collections.emptySet();
+
+        return fut.exchangeActions().cacheGroupsToStart((ccfg, uuid) -> requirePartitionLoad(ccfg, uuid, ctx))
+            .stream()
+            .map(g -> g.descriptor().groupId())
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * @param futs Collection of futures to chain.
+     * @param <T> Result type.
+     * @return Completable future waits for all of.
+     */
+    private static <T extends CompletableFuture<?>> CompletableFuture<Void> allOfFailFast(Collection<T> futs) {

Review comment:
       I'm not sure it's safe to use such an approach. After future competes process can go further and clear snapshot context, but there are some futures in executor that still processed in background, and this can lead to some unpredictable behaviour. Perhaps it's better to to share and analyze some common flag by the futures to fail fast (opCtx.err for example)

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
##########
@@ -3485,8 +3485,12 @@ else if (task instanceof ForceRebalanceExchangeTask) {
                             if (task instanceof ForceRebalanceExchangeTask)
                                 forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
 
-                            for (CacheGroupContext grp : assignsSet.descendingSet()) {
-                                boolean disableRebalance = cctx.snapshot().partitionsAreFrozen(grp);
+                        if (forcedRebFut == null)

Review comment:
       Wrong indent

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1435,13 +1524,62 @@ public static boolean isSnapshotOperation(DiscoveryEvent evt) {
         }
     }
 
+    /**
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param rmtNodeId The remote node to connect to.
+     * @param partHnd Received partition handler.
+     */
+    public IgniteInternalFuture<Void> requestRemoteSnapshotAsync(

Review comment:
       It's not actually async, RemoteSnapshotFutureTask is executed from the same thread

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -967,9 +1723,231 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
                 " operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ", node(s)=" + leftNodes + ']');
         }
 
+        updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+
         finishProcess(reqId, opCtx0.err.get());
     }
 
+    /**
+     * @param mgr Ignite snapshot manager.
+     * @param opCtx Snapshot operation context.
+     * @param snpFile Snapshot file to copy.
+     * @param target Destination path.
+     * @param fut Future which will handle the copy results.
+     */
+    private static void copyFileLocal(
+        IgniteSnapshotManager mgr,
+        SnapshotRestoreContext opCtx,
+        File snpFile,
+        Path target,
+        CompletableFuture<Path> fut
+    ) {
+        CompletableFuture.supplyAsync(
+            () -> {
+                if (opCtx.stopChecker.getAsBoolean())
+                    throw new IgniteInterruptedException("The operation has been stopped on copy file: " + snpFile.getAbsolutePath());
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedException("Thread has been interrupted: " + Thread.currentThread().getName());
+
+                if (!snpFile.exists()) {
+                    throw new IgniteException("Partition snapshot file doesn't exist [snpName=" + opCtx.snpName +
+                        ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + snpFile.getName() + ']');
+                }
+
+                IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, target.toFile(), snpFile.length());
+
+                return target;
+            }, mgr.snapshotExecutorService())
+            .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+            .whenComplete((res, t) -> {
+                if (t == null)
+                    fut.complete(res);
+                else
+                    fut.completeExceptionally(t);
+            });
+    }
+
+    /**
+     * @param dirs List of keys to process.
+     * @param remove {@code} if the keys must be removed.
+     */
+    private void updateMetastorageRecoveryKeys(Collection<File> dirs, boolean remove) {
+        for (File dir : dirs) {
+            ctx.cache().context().database().checkpointReadLock();
+
+            String mKey = RESTORE_KEY_PREFIX + dir.getName();
+
+            try {
+                if (remove)
+                    metaStorage.remove(mKey);
+                else
+                    metaStorage.write(mKey, true);
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Updating the metastorage crash-recovery guard key fails [remove=" + remove +
+                    "dir=" + dir.getAbsolutePath() + ']');
+            }
+            finally {
+                ctx.cache().context().database().checkpointReadUnlock();
+            }
+        }
+    }
+
+    /**
+     * @param affCache Affinity cache.
+     * @param node Cluster node to get assigned partitions.
+     * @return The set of partitions assigned to the given node.
+     */
+    private static <T> Set<T> affinityPartitions(
+        GridAffinityAssignmentCache affCache,
+        ClusterNode node,
+        IntFunction<T> factory

Review comment:
       This parameter looks strange, used only once

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
      * @param res Results.
      * @param errs Errors.
      */
-    private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+    private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
         if (ctx.clientNode())
             return;
 
-        if (!errs.isEmpty()) {
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        opCtx0.errHnd.accept(failure);
+
+        if (failure != null) {
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+            if (U.isLocalNodeCoordinator(ctx.discovery()))
+                rollbackRestoreProc.start(reqId, reqId);
+
+            return;
+        }
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            cacheStartProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Throwable err = opCtx0.err.get();
+
+        if (err != null)
+            return new GridFinishedFuture<>(err);
+
+        if (!U.isLocalNodeCoordinator(ctx.discovery()))
+            return opCtx0.cachesLoadFut;
+
+        Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting restored caches " +
+                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+                ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName()) + ']');
+        }
+
+        // We set the topology node IDs required to successfully start the cache, if any of the required nodes leave
+        // the cluster during the cache startup, the whole procedure will be rolled back.
+        GridCompoundFuture<Boolean, Boolean> awaitBoth = new GridCompoundFuture<>();
+
+        IgniteInternalFuture<Boolean> cacheStartFut = ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+            true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+        // This is required for the rollback procedure to execute the cache groups stop operation.
+        cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error() == null));
+
+        // Convert exception to the RestoreCacheStartException to propagate to other nodes over the distributed process.
+        awaitBoth.add(chainCacheStartException(cacheStartFut));
+        awaitBoth.add(opCtx0.cachesLoadFut);
+
+        awaitBoth.markInitialized();
+
+        return awaitBoth;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut, opCtx0);
+
+        if (F.isEmpty(grpIdsToStart))
+            return;
+
+        assert opCtx0 != null;
+        assert !opCtx0.sameTop : "WAL must be disabled only for caches restoring from snapshot taken on another cluster: " + opCtx0;
+
+        // This is happened on the exchange which has been initiated by a dynamic cache start message and intend to switch
+        // off the WAL for cache groups loading from a snapshot.
+        for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g -> grpIdsToStart.contains(g.groupId()))) {
+            assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+            // Check partitions have not even been created yet, so the PartitionMetaStateRecord won't be logged to the WAL.
+            for (int p = 0; p < grp.topology().partitions(); p++)
+                assert grp.topology().localPartition(p) == null : p;
+
+            grp.localWalEnabled(false, true);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        // This will be called after the processCacheStopRequestOnExchangeDone happens.
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (ctx.clientNode() || fut == null || fut.exchangeActions() == null || opCtx0 == null)
+            return;
+
+        Set<String> grpNamesToStop = fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+                requirePartitionLoad(ccfg, uuid, opCtx0))
+            .stream()
+            .map(g -> g.descriptor().cacheOrGroupName())
+            .collect(Collectors.toSet());
+
+        if (F.isEmpty(grpNamesToStop))
+            return;
+
+        assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+        opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Exception failure = errs.values().stream().findFirst().
+            orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+        if (failure == null) {
+            if (opCtx0.sameTop) {
+                finishProcess(reqId);
+
+                updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+            }
+            else
+                lateAffProc.start(reqId, reqId);
+
+            return;
+        }
+
+        opCtx0.errHnd.accept(failure);
+
+        ClusterNode crd = U.oldest(ctx.discovery().aliveServerNodes(), null);
+
+        // Caches were not even been started and rollback already occurred during PME, so they are not even stared.
+        if (X.hasCause(errs.get(crd.id()), RestoreCacheStartException.class))
+            opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            rollbackRestoreProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param grps Ordered list of cache groups sorted by priority.
+     * @param exchFut Exchange future.
+     */
+    public void onRebalanceReady(Set<CacheGroupContext> grps, @Nullable GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.clientNode())
+            return;
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Set<Integer> exchGrpIds = getCachesLoadingFromSnapshot(exchFut, opCtx0);
+
+        Set<CacheGroupContext> filtered = grps.stream()
+            .filter(Objects::nonNull)
+            .filter(g -> exchGrpIds.contains(g.groupId()))
+            .collect(Collectors.toSet());
+
+        // Restore requests has been already processed at previous exchange.
+        if (filtered.isEmpty())
+            return;
+
+        assert opCtx0 != null;
+
+        // First preload everything from the local node.
+        List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+        Map<Integer, Set<Integer>> notScheduled = new HashMap<>();
+
+        // Register partitions to be processed.
+        for (CacheGroupContext grp : filtered) {
+            if (F.isEmpty(locMetas))
+                break;
+
+            notScheduled.put(grp.groupId(),
+                affinityPartitions(grp.affinity(), ctx.cache().context().localNode(), Integer::new));
+
+            Set<Integer> leftParts = notScheduled.get(grp.groupId());

Review comment:
       `get` method right after `put` into map




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9321: IGNITE-14744 Partition swap on checkpoint

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9321:
URL: https://github.com/apache/ignite/pull/9321#discussion_r715677422



##########
File path: modules/core/src/test/java/org/apache/ignite/platform/PlatformAddArgEntryProcessorBinarizable.java
##########
@@ -19,7 +19,6 @@
 
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
-

Review comment:
       Nothing changed except this line

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
##########
@@ -100,7 +103,20 @@ public boolean clientOnlyExchange() {
      * @return New caches start requests.
      */
     public Collection<CacheActionData> cacheStartRequests() {
-        return cachesToStart != null ? cachesToStart.values() : Collections.emptyList();
+        return cacheStartRequests((ccfg, uuid) -> true);
+    }
+
+    /**
+     * @param filter Cache start requests filtering predicate.
+     * @return New caches start requests.
+     */
+    public Collection<CacheActionData> cacheStartRequests(BiPredicate<CacheConfiguration<?, ?>, @Nullable UUID> filter) {

Review comment:
       This method seems to be redundant. It is used with filter only once, but introduces performance penalties for usages without filtering. It's better to leave `cacheStartRequests()` without streams creation and do filtering only in `GridQueryProcessor`. Or, at least, don't reuse `cacheStartRequests(filter)` in `cacheStartRequests()`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -594,6 +598,51 @@ public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
         store.truncate(tag);
     }
 
+    /** {@inheritDoc} */
+    @Override public PageStore recreate(int grpId, int partId, int tag, Path src) throws IgniteCheckedException {
+        assert cctx.database().checkpointLockIsHeldByThread();
+        assert src.toFile().exists();
+        assert tag >= 0;
+
+        CacheStoreHolder holder = getHolder(grpId);
+
+        if (holder == null)
+            throw new IgniteCheckedException("Failed to get page store for the given cache ID " +
+                "(cache has not been started): " + grpId);
+
+        CacheGroupDescriptor desc = cctx.cache().cacheGroupDescriptor(grpId);
+        DataRegion region = cctx.database().dataRegion(desc.config().getDataRegionName());
+        PageMetrics metrics = region.metrics().cacheGrpPageMetrics(desc.groupId());
+        FileVersionCheckingFactory factory = getPageStoreFactory(grpId, desc.config().isEncryptionEnabled());
+        FilePageStore pageStore = (FilePageStore)getStore(grpId, partId);
+
+        boolean exists = pageStore.exists();
+
+        if (exists)
+            throw new IgniteCheckedException("Previous partition page store must be truncated first: " + partId);
+
+        if (desc == null)
+            throw new IgniteCheckedException("Cache group with given id doesn't exists: " + grpId);
+
+        try {
+            Files.move(src,
+                getPartitionFilePath(cacheWorkDir(desc.config()), partId),
+                StandardCopyOption.ATOMIC_MOVE);
+
+            // Previous page stores may be used by other processes. The link to the instance of a PageStore available
+            // for may internal components, so the best way to share the 'recreation' status is to close the previous
+            // page store instance and to create a new one.
+            return holder.set(partId,
+                factory.createPageStore(getTypeByPartId(partId),
+                    () -> getPartitionFilePath(desc.config(), partId),
+                    metrics.totalPages()::add,

Review comment:
       Shouldn't we reset add pages to the total pages metric? Let's add a test for such a case.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -771,6 +820,14 @@ private CacheStoreHolder initDir(File cacheWorkDir,
         }
     }
 
+    /**
+     * @param ccfg Cache group configuration.
+     * @param partId Partition id.
+     */
+    @NotNull public Path getPartitionFilePath(CacheConfiguration<?, ?> ccfg, int partId) {

Review comment:
       Can be private. Also can be inlined.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -67,7 +70,7 @@
      * since for instance, due to the node filter there is no cache data on node.
      */
     @GridToStringInclude
-    private final Map<Integer, Set<Integer>> locParts = new HashMap<>();
+    private transient Map<Integer, Set<Integer>> locParts = new HashMap<>();

Review comment:
       What's wrong with the default serialization?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -1285,7 +1285,7 @@ private Metas getOrAllocateCacheMetas() throws IgniteCheckedException {
 
         assert locPart != null && locPart.reservations() > 0;
 
-        locPart.dataStore().preload();
+        dataStore(locPart).preload();

Review comment:
       Looks like this is redundant.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3397,6 +3408,34 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) {
         top.globalPartSizes(partSizes);
     }
 
+    /**
+     * @param top Topology to reset all states.
+     */
+    private void resetAllPartitionStates(GridDhtPartitionTopology top) {
+        assert crd.isLocal();
+
+        List<List<ClusterNode>> ideal = cctx.affinity().affinity(top.groupId()).idealAssignmentRaw();
+
+        resetStateByCondition(IntStream.range(0, top.partitions()).boxed().collect(Collectors.toMap(part -> part, part -> emptySet())),
+            state -> true,
+            () -> AffinityTopologyVersion.NONE, // The last major affinity version will be used (e.g. node left, node join event).
+            top,
+            emptySet(),
+            (partId, nodeId) -> F.transform(ideal.get(partId), ClusterNode::id).contains(nodeId),

Review comment:
       `F.transform` creates `ArrayList` and copy all elements on each execution, let's avoid this overhead. 
   For example, like this: `(partId, nodeId) -> F.exist(ideal.get(partId), n -> n.id().equals(nodeId)),`

##########
File path: modules/core/src/test/java/org/apache/ignite/platform/PlatformAddArgEntryProcessor.java
##########
@@ -19,7 +19,6 @@
 
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
-

Review comment:
       Nothing changed except this line

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3491,31 +3537,32 @@ else if (cntr == maxCntr.cnt)
      * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
      *
      * @param top Topology.
-     * @param maxCntrs Max counter partiton map.
+     * @param awaitAffVer Topology version to wait late affinity assignment to. If <tt>NONE</tt> is used then
+     * the last topology version which requires affinity re-calculation is used.
      * @param haveHistory Set of partitions witch have historical supplier.
      */
-    private void resetOwnersByCounter(GridDhtPartitionTopology top,
-        Map<Integer, CounterWithNodes> maxCntrs, Set<Integer> haveHistory) {
-        Map<Integer, Set<UUID>> ownersByUpdCounters = U.newHashMap(maxCntrs.size());
-        Map<Integer, Long> partSizes = U.newHashMap(maxCntrs.size());
-
-        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
-            ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
-
-            partSizes.put(e.getKey(), e.getValue().size);
-        }
+    private void resetStateByCondition(
+        Map<Integer, Set<UUID>> partsToReset,
+        Predicate<GridDhtPartitionState> statesToReset,

Review comment:
       IMO `EnumSet` is more readable then `Predicate`, but it's up to you.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3397,6 +3408,34 @@ private void assignPartitionSizes(GridDhtPartitionTopology top) {
         top.globalPartSizes(partSizes);
     }
 
+    /**
+     * @param top Topology to reset all states.
+     */
+    private void resetAllPartitionStates(GridDhtPartitionTopology top) {
+        assert crd.isLocal();
+
+        List<List<ClusterNode>> ideal = cctx.affinity().affinity(top.groupId()).idealAssignmentRaw();
+
+        resetStateByCondition(IntStream.range(0, top.partitions()).boxed().collect(Collectors.toMap(part -> part, part -> emptySet())),
+            state -> true,
+            () -> AffinityTopologyVersion.NONE, // The last major affinity version will be used (e.g. node left, node join event).
+            top,
+            emptySet(),
+            (partId, nodeId) -> F.transform(ideal.get(partId), ClusterNode::id).contains(nodeId),
+            IntStream.range(0, top.partitions()).boxed().collect(Collectors.toMap(p -> p, p -> 0L)));
+
+        Collection<ClusterNode> affNodes = cctx.discovery().cacheGroupAffinityNodes(top.groupId(), AffinityTopologyVersion.NONE);
+
+        for (ClusterNode node : affNodes) {
+            for (Map.Entry<Integer, GridDhtPartitionState> e : top.partitions(node.id()).map().entrySet()) {
+                if (e.getValue() == GridDhtPartitionState.MOVING)
+                    continue;
+
+                assert false : "Partitions must be set to MOVING state on all nodes [node=" + node.id() + ", state=" + e + ']';
+            }
+        }

Review comment:
       `if (U.assertionsEnabled()) ...`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1408,27 +1460,59 @@ public int pageSize() {
      *
      */
     private static class CacheStoreHolder extends AbstractList<PageStore> {
-        /** Index store. */
-        private final PageStore idxStore;
-
         /** Partition stores. */
-        private final PageStore[] partStores;
+        private final AtomicReferenceArray<PageStore> stores;
 
         /**
+         * @param idxStore Index page store.
+         * @param partStores Partition page stores.
          */
         CacheStoreHolder(PageStore idxStore, PageStore[] partStores) {
-            this.idxStore = requireNonNull(idxStore);
-            this.partStores = requireNonNull(partStores);
+            assert idxStore.type() == PageStore.TYPE_IDX;
+
+            int len = requireNonNull(partStores).length;
+
+            PageStore[] arr = Arrays.copyOf(partStores, len + 1);
+            arr[len] = requireNonNull(idxStore);
+
+            stores = new AtomicReferenceArray<>(arr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public PageStore get(int partId) {

Review comment:
       Actually, it's not quite correct. You override `get(int index)` method of `List` class with different semantic (`partId` instead of `index`). This method (and `put` too) is used internally by `List` class (by iterator, for example) and there will be a problem with maximum allowed partitions count (MAX_PARTITION_ID+1 partitions): iterator will try to get partition with index MAX_PARTITION_ID+1 for an INDEX_PARTITION and will fail with the `IllegalArgumentException`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
##########
@@ -930,28 +943,31 @@ public GridDhtLocalPartition getOrCreatePartition(int p) {
         return loc;
     }
 
-    /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
+    /**
+     * @param partId Partition id to create.
+     * @param exit Condition to return partition immediately if it's already created.
+     * @param act Post-processing action for created partition if the previous partition exists.
+     * @return Created partition or the exiting partition.
+     */
+    public GridDhtLocalPartition doForcePartitionCreate(
+        int partId,
+        Predicate<GridDhtPartitionState> exit,
+        BiConsumer<GridDhtPartitionState, GridDhtLocalPartition> act

Review comment:
       It's much harder to read the code if there are a lot of lambdas. Why don't use just EnumSet of states and some boolean flag to do `resetUpdateCounter()`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3491,31 +3537,32 @@ else if (cntr == maxCntr.cnt)
      * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
      *
      * @param top Topology.
-     * @param maxCntrs Max counter partiton map.
+     * @param awaitAffVer Topology version to wait late affinity assignment to. If <tt>NONE</tt> is used then
+     * the last topology version which requires affinity re-calculation is used.
      * @param haveHistory Set of partitions witch have historical supplier.
      */
-    private void resetOwnersByCounter(GridDhtPartitionTopology top,
-        Map<Integer, CounterWithNodes> maxCntrs, Set<Integer> haveHistory) {
-        Map<Integer, Set<UUID>> ownersByUpdCounters = U.newHashMap(maxCntrs.size());
-        Map<Integer, Long> partSizes = U.newHashMap(maxCntrs.size());
-
-        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
-            ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
-
-            partSizes.put(e.getKey(), e.getValue().size);
-        }
+    private void resetStateByCondition(
+        Map<Integer, Set<UUID>> partsToReset,
+        Predicate<GridDhtPartitionState> statesToReset,
+        Supplier<AffinityTopologyVersion> awaitAffVer,

Review comment:
       Why do we need `Supplier` here? Why not just `AffinityTopologyVersion`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
##########
@@ -3876,8 +3893,13 @@ public boolean rebuildIndexOnExchange(int cacheId, GridDhtPartitionsExchangeFutu
         Set<Integer> cacheIds = emptySet();
 
         if (acts != null) {
-            if (!F.isEmpty(acts.cacheStartRequests())) {
-                cacheIds = acts.cacheStartRequests().stream()
+            // THe index rebuild for restoring caches will be explicitly completed under the restore manager,

Review comment:
       `THe` -> `The`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
##########
@@ -289,26 +308,37 @@ public void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor
     void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
         assert req.resetLostPartitions() : req;
 
-        cachesToResetLostParts = add(cachesToResetLostParts, req, desc);
+        cachesToResetLostParts = add(cachesToResetLostParts, req, desc, null);
     }
 
     /**
      * @param grpDesc Group descriptor.
      */
-    void addCacheGroupToStart(CacheGroupDescriptor grpDesc) {
+    void addCacheGroupToStart(CacheGroupDescriptor grpDesc, @Nullable UUID ownerId) {
         assert grpDesc != null;
 
         if (cacheGrpsToStart == null)
             cacheGrpsToStart = new ArrayList<>();
 
-        cacheGrpsToStart.add(new CacheGroupActionData(grpDesc));
+        cacheGrpsToStart.add(new CacheGroupActionData(grpDesc, false, ownerId));
     }
 
     /**
      * @return Cache groups to start.
      */
     public List<CacheGroupActionData> cacheGroupsToStart() {
-        return cacheGrpsToStart != null ? cacheGrpsToStart : Collections.<CacheGroupActionData>emptyList();
+        return cacheGroupsToStart((ccfg, uuid) -> true);

Review comment:
       Let's keep this method as is (do not reuse `cacheGroupsToStart`) to avoid extra objects creation by streams.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
##########
@@ -628,7 +628,7 @@ private void removeCacheData(int cacheId) {
     /** {@inheritDoc} */
     @Nullable @Override public CacheDataRow read(GridCacheContext cctx, KeyCacheObject key)
         throws IgniteCheckedException {
-        CacheDataStore dataStore = dataStore(cctx, key);
+        CacheDataStore dataStore = dataStore(cctx.affinity().partition(key), false);

Review comment:
       Changed back?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -1012,15 +1062,33 @@ else if (dir.getName().startsWith(CACHE_GRP_DIR_PREFIX))
         if (files == null)
             return Collections.emptyList();
 
-        return Arrays.stream(dir.listFiles())
+        return Arrays.stream(files)
             .sorted()
             .filter(File::isDirectory)
-            .filter(f -> f.getName().startsWith(CACHE_DIR_PREFIX) || f.getName().startsWith(CACHE_GRP_DIR_PREFIX) ||
-                f.getName().equals(MetaStorage.METASTORAGE_DIR_NAME))
+            .filter(f -> CACHE_DIR_FILTER.test(f.getName()))
             .filter(f -> names.test(cacheGroupName(f)))
             .collect(Collectors.toList());
     }
 
+    /**
+     * @param dir Directory to check.
+     * @param grpId Cache group id
+     * @return Files that match cache or cache group pattern.
+     */
+    public static File cacheDirectory(File dir, int grpId) {
+        File[] files = dir.listFiles();
+
+        if (files == null)
+            return null;
+
+        return Arrays.stream(files)
+            .filter(File::isDirectory)
+            .filter(f -> CACHE_DIR_FILTER.test(f.getName()))

Review comment:
       Do we really need metastorage directory here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -594,6 +598,51 @@ public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
         store.truncate(tag);
     }
 
+    /** {@inheritDoc} */
+    @Override public PageStore recreate(int grpId, int partId, int tag, Path src) throws IgniteCheckedException {

Review comment:
       Perhaps not a relevant method name. At least javadoc should be added to note what exactly is recreated.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -133,6 +135,12 @@
     /** */
     public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
 
+    /** */
+    public static final Predicate<String> CACHE_DIR_FILTER = dirName ->

Review comment:
       Since metastorage is not a cache, perhaps it's better to name this predicate something like 'DATA_DIR_FILTER'?
   Let's use `Predicate<File>`, but not `Predicate<String>`, to avoid another nesting level.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -594,6 +598,51 @@ public void removeConfigurationChangeListener(BiConsumer<String, File> lsnr) {
         store.truncate(tag);
     }
 
+    /** {@inheritDoc} */
+    @Override public PageStore recreate(int grpId, int partId, int tag, Path src) throws IgniteCheckedException {
+        assert cctx.database().checkpointLockIsHeldByThread();
+        assert src.toFile().exists();
+        assert tag >= 0;
+
+        CacheStoreHolder holder = getHolder(grpId);
+
+        if (holder == null)

Review comment:
       `{ ... }`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3491,31 +3537,32 @@ else if (cntr == maxCntr.cnt)
      * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
      *
      * @param top Topology.
-     * @param maxCntrs Max counter partiton map.
+     * @param awaitAffVer Topology version to wait late affinity assignment to. If <tt>NONE</tt> is used then
+     * the last topology version which requires affinity re-calculation is used.
      * @param haveHistory Set of partitions witch have historical supplier.

Review comment:
       witch -> which

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+abstract class AbstractSnapshotMessage implements Message {
+    /** Unique request id. */
+    private String rqId;
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    protected AbstractSnapshotMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param rqId Unique snapshot name.
+     */
+    protected AbstractSnapshotMessage(String rqId) {
+        assert U.alphanumericUnderscore(rqId) : rqId;
+
+        this.rqId = rqId;
+    }
+
+    /**
+     * @return Unique snapshot name.
+     */
+    public String requestId() {

Review comment:
       Why method name and javadoc is different? Can we rename field/methods to `snapshotName`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -987,14 +1044,7 @@ public void readCacheConfigurations(File dir, Map<String, StoredCacheData> ccfgs
             if (conf.exists() && conf.length() > 0) {
                 StoredCacheData cacheData = readCacheData(conf);
 
-                String cacheName = cacheData.config().getName();
-
-                if (!ccfgs.containsKey(cacheName))
-                    ccfgs.put(cacheName, cacheData);
-                else {
-                    U.warn(log, "Cache with name=" + cacheName + " is already registered, skipping config file "

Review comment:
       Shouldn't we keep this warning?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -155,7 +158,58 @@ public int pageSize() {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return locParts.entrySet().stream().
+            collect(Collectors.toMap(Map.Entry::getKey,
+                e -> new HashSet<>(e.getValue())));
+    }
+
+    /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        // Write out any hidden serialization.
+        s.defaultWriteObject();
+
+        // Write out size of map.
+        s.writeInt(locParts.size());
+
+        // Write out all elements in the proper order.
+        for (Map.Entry<Integer, Set<Integer>> e : locParts.entrySet()) {
+            s.writeInt(e.getKey());
+            s.writeInt(e.getValue().size());
+
+            for (Integer partId : e.getValue())
+                s.writeInt(partId);
+        }
+    }
+
+    /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */
+    private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
+        // Read in any hidden serialization.
+        s.defaultReadObject();
+
+        // Read size and verify non-negative.
+        int size = s.readInt();
+
+        if (size < 0)
+            throw new InvalidObjectException("Illegal size: " + size);
+
+        locParts = new HashMap<>(size);
+
+        // Read in all elements in the proper order.
+        for (int i = 0; i < size; i++) {
+            int grpId = s.readInt();
+            int total = s.readInt();
+
+            if (total < 0)
+                throw new InvalidObjectException("Illegal size: " + total);
+
+            Set<Integer> parts = new HashSet<>(total);

Review comment:
       `U.newHashSet(size)`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -155,7 +158,58 @@ public int pageSize() {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return locParts.entrySet().stream().

Review comment:
       Sometimes you access `partitions()` method twice per each cache group, it's very ineffective, on each invoke a new stream, a new map and new sets are created. If you are afraid of accidental collection modification, it's better to wrap it to `Collections.unmodifiableMap`/`Collections.unmodifiableSet` once and then always return this created unmodifiable instance.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRequestMessage.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class SnapshotRequestMessage extends AbstractSnapshotMessage {
+    /** Snapshot request message type (value is {@code 177}). */

Review comment:
       `@code 178`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
##########
@@ -155,7 +158,58 @@ public int pageSize() {
      * saved on the local node because some of them may be skipped due to cache node filter).
      */
     public Map<Integer, Set<Integer>> partitions() {
-        return locParts;
+        return locParts.entrySet().stream().
+            collect(Collectors.toMap(Map.Entry::getKey,
+                e -> new HashSet<>(e.getValue())));
+    }
+
+    /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */
+    private void writeObject(java.io.ObjectOutputStream s)
+        throws java.io.IOException {
+        // Write out any hidden serialization.
+        s.defaultWriteObject();
+
+        // Write out size of map.
+        s.writeInt(locParts.size());
+
+        // Write out all elements in the proper order.
+        for (Map.Entry<Integer, Set<Integer>> e : locParts.entrySet()) {
+            s.writeInt(e.getKey());
+            s.writeInt(e.getValue().size());
+
+            for (Integer partId : e.getValue())
+                s.writeInt(partId);
+        }
+    }
+
+    /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */
+    private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException {
+        // Read in any hidden serialization.
+        s.defaultReadObject();
+
+        // Read size and verify non-negative.
+        int size = s.readInt();
+
+        if (size < 0)
+            throw new InvalidObjectException("Illegal size: " + size);
+
+        locParts = new HashMap<>(size);

Review comment:
       `U.newHashMap(size)`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseMessage.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class SnapshotResponseMessage extends AbstractSnapshotMessage {
+    /** Snapshot response message type (value is {@code 178}). */

Review comment:
       `@code 179`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf closed pull request #9321: IGNITE-14744 Partition swap on checkpoint

Posted by GitBox <gi...@apache.org>.
Mmuzaf closed pull request #9321:
URL: https://github.com/apache/ignite/pull/9321


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf closed pull request #9321: IGNITE-14744 Partition swap on checkpoint

Posted by GitBox <gi...@apache.org>.
Mmuzaf closed pull request #9321:
URL: https://github.com/apache/ignite/pull/9321


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org