You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/10/28 14:58:33 UTC

[GitHub] [ignite] Mmuzaf opened a new pull request #9539: IGNITE-14744 Restore cache groups from snapshot taked on other topology

Mmuzaf opened a new pull request #9539:
URL: https://github.com/apache/ignite/pull/9539


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r749250330



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -872,12 +1118,49 @@ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exce
             rollbackRestoreProc.start(reqId, reqId);
     }
 
+    /**
+     * @param metas Map of snapshot metadata distribution across the cluster.
+     * @return Map of cache partitions per each node.
+     */
+    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+        Map<UUID, ArrayList<SnapshotMetadata>> metas,
+        BiPredicate<Integer, Integer> filter
+    ) {
+        Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+        for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e : metas.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(new ArrayList<>())) {

Review comment:
       `Collections.emptyList()` generalized by the `Object` class, so it's not possisble to use it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r752259824



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>();
+
+            RemoteSnapshotFilesRecevier curr = active;
+            RemoteSnapshotFilesRecevier changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);

Review comment:
       See no reason to overcomplicate it. It provides the same guarantees as just `new HashSet<>(queue)` + `active`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
##########
@@ -3485,7 +3485,7 @@ else if (task instanceof ForceRebalanceExchangeTask) {
                             if (task instanceof ForceRebalanceExchangeTask)
                                 forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
 
-                            for (CacheGroupContext grp : assignsSet.descendingSet()) {
+                        for (CacheGroupContext grp : assignsSet.descendingSet()) {

Review comment:
       Still not fixed

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3925,16 +3925,16 @@ private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndRe
 
                 if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) {
                     if (exchActions != null) {
-                        Set<String> caches = exchActions.cachesToResetLostPartitions();
+                    Set<String> caches = exchActions.cachesToResetLostPartitions();

Review comment:
       Indent not fixed. All other changes to this file should be reverted too.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)

Review comment:
       Let's process the `stopping` flag at the beginning of the method and complete all futures in the queue in the loop, to avoid recursive call and possible stack overflow.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>();
+
+            RemoteSnapshotFilesRecevier curr = active;
+            RemoteSnapshotFilesRecevier changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);
+
+            return futs.stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotFilesRequestMessage) {
+                    SnapshotFilesRequestMessage reqMsg0 = (SnapshotFilesRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    synchronized (this) {
+                        AbstractSnapshotFutureTask<?> task = lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                        if (task != null) {
+                            // Task will also be removed from local map due to the listener on future done.
+                            task.cancel();
+
+                            log.info("Snapshot request has been cancelled due to another request received " +
+                                "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                        }
+                    }
+
+                    AbstractSnapshotFutureTask<?> task = registerTask(rqId,
+                        new SnapshotResponseRemoteFutureTask(cctx,
+                            nodeId,
+                            snpName,
+                            tmpWorkDir,
+                            ioFactory,
+                            rmtSndrFactory.apply(rqId, nodeId),
+                            reqMsg0.parts()));
+
+                    task.listen(f -> {
+                        if (f.error() == null)
+                            return;
+
+                        U.error(log, "Failed to process request of creating a snapshot " +
+                            "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                        try {
+                            cctx.gridIO().sendToCustomTopic(nodeId,
+                                DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                new SnapshotFilesFailureMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                SYSTEM_POOL);
+                        }
+                        catch (IgniteCheckedException ex0) {
+                            U.error(log, "Fail to send the response message with processing snapshot request " +
+                                "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                        }
+                    });
+
+                    task.start();
+                }
+                else if (msg instanceof SnapshotFilesFailureMessage) {
+                    SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;
+
+                    RemoteSnapshotFilesRecevier task = active;
+
+                    if (task == null || !task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

Review comment:
       Should we really shut down the node in this case? Looks like this failure is not lead to node inconsistency. Why not just send `SnapshotFilesFailureMessage`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r747573383



##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
##########
@@ -277,7 +280,8 @@ protected void ensureCacheAbsent(CacheConfiguration<?, ?> ccfg) throws IgniteChe
     protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
         return ccfg.setCacheMode(CacheMode.PARTITIONED)
             .setBackups(2)
-            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setAffinity(new RendezvousAffinityFunction(false, 4));

Review comment:
       What's wrong with default partitions count? Only 4 partitions looks too small, especially when there are more than 4 nodes in the cluster.

##########
File path: modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreWithIndexingTest.java
##########
@@ -146,7 +147,12 @@ public void testClusterSnapshotRestoreOnBiggerTopology() throws Exception {
 
         awaitPartitionMapExchange();
 
-        assertCacheKeys(ignite.cache(DEFAULT_CACHE_NAME).withKeepBinary(), CACHE_KEYS_RANGE);
+        // On one node the index will be copied right from snapshot, on the other it will be rebuilt.

Review comment:
       Why? We restore 2 nodes snapshot on 4 nodes cluster, there can be any distribution for partitions, not exactly 1:1.
   Where did we check this statement? Here we just wait for index rebuild futures if any exist.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** */
+    private static final String FIRST_CLUSTER_PREFIX = "one_";
+
+    /** */
+    private static final String SECOND_CLUSTER_PREFIX = "two_";
+
+    /** {@code true} if snapshot parts has been initialized on test-class startup. */
+    private static boolean inited;
+
+    /** Snapshot parts on dedicated cluster. Each part has its own local directory. */
+    private static final Set<Path> snpParts = new HashSet<>();
+
+    /** */
+    private static final Function<String, BiFunction<Integer, IgniteConfiguration, String>> CLUSTER_DIR =
+        new Function<String, BiFunction<Integer, IgniteConfiguration, String>>() {
+            @Override public BiFunction<Integer, IgniteConfiguration, String> apply(String prefix) {
+                return (id, cfg) -> Paths.get(defaultWorkDirectory().toString(),
+                    prefix + U.maskForFileName(cfg.getIgniteInstanceName())).toString();
+            }
+        };
+
+    /** Cache value builder. */
+    private final Function<Integer, Object> valBuilder = String::valueOf;
+
+    /** {@inheritDoc} */
+    @Override protected Function<Integer, Object> valueBuilder() {
+        return valBuilder;
+    }

Review comment:
       Redundant

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRestoreFromRemoteTest.java
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED;
+import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.partId;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
+import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RESTORE_CACHE_GROUP_SNAPSHOT_START;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class IgniteSnapshotRestoreFromRemoteTest extends IgniteClusterSnapshotRestoreBaseTest {
+    /** */
+    private static final String FIRST_CLUSTER_PREFIX = "one_";
+
+    /** */
+    private static final String SECOND_CLUSTER_PREFIX = "two_";
+
+    /** {@code true} if snapshot parts has been initialized on test-class startup. */
+    private static boolean inited;
+
+    /** Snapshot parts on dedicated cluster. Each part has its own local directory. */
+    private static final Set<Path> snpParts = new HashSet<>();
+
+    /** */
+    private static final Function<String, BiFunction<Integer, IgniteConfiguration, String>> CLUSTER_DIR =
+        new Function<String, BiFunction<Integer, IgniteConfiguration, String>>() {
+            @Override public BiFunction<Integer, IgniteConfiguration, String> apply(String prefix) {
+                return (id, cfg) -> Paths.get(defaultWorkDirectory().toString(),
+                    prefix + U.maskForFileName(cfg.getIgniteInstanceName())).toString();
+            }
+        };
+
+    /** Cache value builder. */
+    private final Function<Integer, Object> valBuilder = String::valueOf;
+
+    /** {@inheritDoc} */
+    @Override protected Function<Integer, Object> valueBuilder() {
+        return valBuilder;
+    }
+
+    /** @throws Exception If fails. */
+    @Before
+    public void prepareDedicatedSnapshot() throws Exception {
+        if (!inited) {
+            cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+
+            CacheConfiguration<Integer, Object> cacheCfg1 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE1)).setGroupName(SHARED_GRP);
+
+            CacheConfiguration<Integer, Object> cacheCfg2 =
+                txCacheConfig(new CacheConfiguration<Integer, Object>(CACHE2)).setGroupName(SHARED_GRP);
+
+            IgniteEx ignite = startDedicatedGridsWithCache(FIRST_CLUSTER_PREFIX, 6, CACHE_KEYS_RANGE, valBuilder,
+                dfltCacheCfg.setBackups(0), cacheCfg1, cacheCfg2);
+
+            ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+            awaitPartitionMapExchange();
+            stopAllGrids();
+
+            snpParts.addAll(findSnapshotParts(FIRST_CLUSTER_PREFIX, SNAPSHOT_NAME));
+
+            inited = true;
+        }
+
+        beforeTestSnapshot();
+        cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+    }
+
+    /** @throws Exception If fails. */
+    @After
+    public void afterSwitchSnapshot() throws Exception {
+        afterTestSnapshot();
+        cleanupDedicatedPersistenceDirs(SECOND_CLUSTER_PREFIX);
+    }
+
+    /** */
+    @AfterClass
+    public static void cleanupSnapshot() {
+        snpParts.forEach(U::delete);
+        cleanupDedicatedPersistenceDirs(FIRST_CLUSTER_PREFIX);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testRestoreAllGroups() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        // Restore all cache groups.
+        grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null).get(TIMEOUT);
+
+        awaitPartitionMapExchange(true, true, null, true);
+
+        assertCacheKeys(scc.cache(DEFAULT_CACHE_NAME), CACHE_KEYS_RANGE);
+        assertCacheKeys(scc.cache(CACHE1), CACHE_KEYS_RANGE);
+        assertCacheKeys(scc.cache(CACHE2), CACHE_KEYS_RANGE);
+
+        waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testSnapshotCachesStoppedIfLoadingFailOnRemote() throws Exception {
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        grid(0).cache(DEFAULT_CACHE_NAME).destroy();
+
+        IgniteSnapshotManager mgr = snp(grid(1));
+        mgr.remoteSnapshotSenderFactory(new BiFunction<String, UUID, SnapshotSender>() {
+            @Override public SnapshotSender apply(String s, UUID uuid) {
+                return new DelegateSnapshotSender(log, mgr.snapshotExecutorService(), mgr.remoteSnapshotSenderFactory(s, uuid)) {
+                    @Override public void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                        if (partId(part.getName()) > 0)
+                            throw new IgniteException("Test exception. Uploading partition file failed: " + pair);
+
+                        super.sendPart0(part, cacheDirName, pair, length);
+                    }
+                };
+            }
+        });
+
+        IgniteFuture<?> fut = grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, null);
+
+        GridTestUtils.assertThrowsAnyCause(log,
+            () -> fut.get(TIMEOUT),
+            IgniteException.class,
+            "Test exception. Uploading partition file failed");
+        assertNull(scc.cache(DEFAULT_CACHE_NAME));
+        ensureCacheAbsent(dfltCacheCfg);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testSnapshotCachesStoppedIfNodeCrashed() throws Exception {
+        CacheConfiguration<?, ?> ccfg0 = dfltCacheCfg;
+        dfltCacheCfg = null;
+
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 3);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        copyAndShuffle(snpParts, G.allGrids());
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(grid(2));
+
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi, RESTORE_CACHE_GROUP_SNAPSHOT_START, DEFAULT_CACHE_NAME);
+        IgniteInternalFuture<?> stopFut = runAsync(() -> stopGrid(2, true));
+
+        GridTestUtils.assertThrowsAnyCause(
+            log,
+            () -> fut.get(TIMEOUT),
+            ClusterTopologyCheckedException.class,
+            "Required node has left the cluster"
+        );
+
+        stopFut.get(TIMEOUT);
+
+        awaitPartitionMapExchange();
+        ensureCacheAbsent(ccfg0);
+
+        Ignite g3 = startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);
+
+        awaitPartitionMapExchange();
+        assertNull(g3.cache(DEFAULT_CACHE_NAME));
+        ensureCacheAbsent(ccfg0);
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testSnapshotRestoreDuringForceReassignment() throws Exception {
+        String locCacheName = "IgniteTestCache";
+        dfltCacheCfg = null;
+
+        IgniteEx scc = startDedicatedGrids(SECOND_CLUSTER_PREFIX, 2);
+        scc.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache<Integer, Object> cache = scc.getOrCreateCache(new CacheConfiguration<Integer, Object>(locCacheName)
+            .setAffinity(new RendezvousAffinityFunction(false, 8)).setBackups(2));
+
+        for (int i = 0; i < CACHE_KEYS_RANGE; i++)
+            cache.put(i, valBuilder.apply(i));
+
+        forceCheckpoint();
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(grid(0));
+        spi0.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+
+        startDedicatedGrid(SECOND_CLUSTER_PREFIX, 2);
+
+        resetBaselineTopology();
+        spi0.waitForBlocked();
+
+        copyAndShuffle(snpParts, G.allGrids());
+        IgniteFuture<Void> fut = waitForBlockOnRestore(spi0, RESTORE_CACHE_GROUP_SNAPSHOT_START, DEFAULT_CACHE_NAME);
+
+        IgniteFuture<?> forceRebFut = cache.rebalance();

Review comment:
       This method will be removed soon. And AFAIK this method only triggers rebalance when it's delayed. You will get the same result even without waiting for this future. 
   Also, perhaps this test was left from the old implementation, I don't understand how the rebalance related to the current implementation.

##########
File path: modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
##########
@@ -623,7 +615,7 @@ public void testNodeFailDuringFilesCopy() throws Exception {
         IgniteFuture<Void> fut =
             grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(DEFAULT_CACHE_NAME));
 
-        stopFut.get(TIMEOUT);
+        stopFut.get();

Review comment:
       Why timeout was removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf merged pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
Mmuzaf merged pull request #9539:
URL: https://github.com/apache/ignite/pull/9539


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf merged pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
Mmuzaf merged pull request #9539:
URL: https://github.com/apache/ignite/pull/9539


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r752637110



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>();
+
+            RemoteSnapshotFilesRecevier curr = active;
+            RemoteSnapshotFilesRecevier changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);
+
+            return futs.stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotFilesRequestMessage) {
+                    SnapshotFilesRequestMessage reqMsg0 = (SnapshotFilesRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    synchronized (this) {
+                        AbstractSnapshotFutureTask<?> task = lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                        if (task != null) {
+                            // Task will also be removed from local map due to the listener on future done.
+                            task.cancel();
+
+                            log.info("Snapshot request has been cancelled due to another request received " +
+                                "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                        }
+                    }
+
+                    AbstractSnapshotFutureTask<?> task = registerTask(rqId,
+                        new SnapshotResponseRemoteFutureTask(cctx,
+                            nodeId,
+                            snpName,
+                            tmpWorkDir,
+                            ioFactory,
+                            rmtSndrFactory.apply(rqId, nodeId),
+                            reqMsg0.parts()));
+
+                    task.listen(f -> {
+                        if (f.error() == null)
+                            return;
+
+                        U.error(log, "Failed to process request of creating a snapshot " +
+                            "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                        try {
+                            cctx.gridIO().sendToCustomTopic(nodeId,
+                                DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                new SnapshotFilesFailureMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                SYSTEM_POOL);
+                        }
+                        catch (IgniteCheckedException ex0) {
+                            U.error(log, "Fail to send the response message with processing snapshot request " +
+                                "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                        }
+                    });
+
+                    task.start();
+                }
+                else if (msg instanceof SnapshotFilesFailureMessage) {
+                    SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;
+
+                    RemoteSnapshotFilesRecevier task = active;
+
+                    if (task == null || !task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

Review comment:
       I think we should catch and handle all the assertions that may occur during the message processing. From may point of view assertion == node failure.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2251,626 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotFilesRecevier extends GridFutureAdapter<Void> {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotFilesRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotFilesRecevier(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotFilesRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** Initiate handler by sending request message. */
+        public synchronized void init() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            try {
+                partHnd.accept(part, null);
+            }
+            catch (IgniteInterruptedException e) {
+                throw new TransmissionCancelledException(e.getMessage());
+            }
+
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotFilesRecevier future = (RemoteSnapshotFilesRecevier)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotFilesRecevier.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotFilesRecevier active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotFilesRecevier> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling.
+         */
+        public synchronized void submit(IgniteSnapshotManager.RemoteSnapshotFilesRecevier next) {
+            assert next != null;
+
+            RemoteSnapshotFilesRecevier curr = active;
+
+            if (curr == null || curr.isDone()) {
+                next.listen(f -> scheduleNext());
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.init();
+            }
+            else
+                queue.offer(next);
+        }
+
+        /** Schedule next async receiver. */
+        private synchronized void scheduleNext() {
+            RemoteSnapshotFilesRecevier next = queue.poll();
+
+            if (next == null)
+                return;
+
+            submit(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotFilesRecevier> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be already completed.
+         */
+        private Set<RemoteSnapshotFilesRecevier> activeTasks() {
+            Set<RemoteSnapshotFilesRecevier> futs = new HashSet<>();
+
+            RemoteSnapshotFilesRecevier curr = active;
+            RemoteSnapshotFilesRecevier changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);
+
+            return futs.stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotFilesRequestMessage) {
+                    SnapshotFilesRequestMessage reqMsg0 = (SnapshotFilesRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    synchronized (this) {
+                        AbstractSnapshotFutureTask<?> task = lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                        if (task != null) {
+                            // Task will also be removed from local map due to the listener on future done.
+                            task.cancel();
+
+                            log.info("Snapshot request has been cancelled due to another request received " +
+                                "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                        }
+                    }
+
+                    AbstractSnapshotFutureTask<?> task = registerTask(rqId,
+                        new SnapshotResponseRemoteFutureTask(cctx,
+                            nodeId,
+                            snpName,
+                            tmpWorkDir,
+                            ioFactory,
+                            rmtSndrFactory.apply(rqId, nodeId),
+                            reqMsg0.parts()));
+
+                    task.listen(f -> {
+                        if (f.error() == null)
+                            return;
+
+                        U.error(log, "Failed to process request of creating a snapshot " +
+                            "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                        try {
+                            cctx.gridIO().sendToCustomTopic(nodeId,
+                                DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                new SnapshotFilesFailureMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                SYSTEM_POOL);
+                        }
+                        catch (IgniteCheckedException ex0) {
+                            U.error(log, "Fail to send the response message with processing snapshot request " +
+                                "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                        }
+                    });
+
+                    task.start();
+                }
+                else if (msg instanceof SnapshotFilesFailureMessage) {
+                    SnapshotFilesFailureMessage respMsg0 = (SnapshotFilesFailureMessage)msg;
+
+                    RemoteSnapshotFilesRecevier task = active;
+
+                    if (task == null || !task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));

Review comment:
       I think we should catch and handle all the assertions that may occur during the message processing. From my point of view assertion == node failure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r744705988



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
##########
@@ -3485,7 +3485,7 @@ else if (task instanceof ForceRebalanceExchangeTask) {
                             if (task instanceof ForceRebalanceExchangeTask)
                                 forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
 
-                            for (CacheGroupContext grp : assignsSet.descendingSet()) {
+                        for (CacheGroupContext grp : assignsSet.descendingSet()) {

Review comment:
       Wrong indent

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -96,9 +115,12 @@
     private final GridKernalContext ctx;
 
     /** Cache group restore prepare phase. */
-    private final DistributedProcess<SnapshotOperationRequest, ArrayList<StoredCacheData>> prepareRestoreProc;
+    private final DistributedProcess<SnapshotOperationRequest, SnapshotRestoreOperationResponse> prepareRestoreProc;
 
-    /** Cache group restore cache start phase. */
+    /**Cache group restore cache start phase. */

Review comment:
       Space after `/**`.
   Looks like javadoc for `cacheStartProc`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * @param <T> Type of snapshot processing result.
+ */
+abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
+    /** Shared context. */
+    protected final GridCacheSharedContext<?, ?> cctx;
+
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Node id which cause snapshot operation. */
+    protected final UUID srcNodeId;
+
+    /** Unique identifier of snapshot process. */
+    protected final String snpName;
+
+    /** Snapshot working directory on file system. */
+    protected final File tmpSnpWorkDir;
+
+    /** IO factory which will be used for creating snapshot delta-writers. */
+    protected final FileIOFactory ioFactory;
+
+    /** Snapshot data sender. */
+    @GridToStringExclude
+    protected final SnapshotSender snpSndr;
+
+    /** Partition to be processed. */
+    protected final Map<Integer, Set<Integer>> parts;
+
+    /** An exception which has been occurred during snapshot processing. */
+    protected final AtomicReference<Throwable> err = new AtomicReference<>();
+
+    /**
+     * @param cctx Shared context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param snpName Unique identifier of snapshot process.
+     * @param tmpWorkDir Working directory for intermediate snapshot results.
+     * @param ioFactory Factory to working with snapshot files.
+     * @param snpSndr Factory which produces snapshot receiver instance.
+     * @param parts Partition to be processed.
+     */
+    protected AbstractSnapshotFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        String snpName,
+        File tmpWorkDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts
+    ) {
+        assert snpName != null : "Snapshot name cannot be empty or null.";
+        assert snpSndr != null : "Snapshot sender which handles execution tasks must be not null.";
+        assert snpSndr.executor() != null : "Executor service must be not null.";
+
+        this.cctx = cctx;
+        this.log = cctx.logger(AbstractSnapshotFutureTask.class);
+        this.srcNodeId = srcNodeId;
+        this.snpName = snpName;
+        this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+        this.ioFactory = ioFactory;
+        this.snpSndr = snpSndr;
+        this.parts = parts;
+    }
+
+    /**
+     * @return Snapshot name.
+     */
+    public String snapshotName() {
+        return snpName;
+    }
+
+    /**
+     * @return Node id which triggers this operation.
+     */
+    public UUID sourceNodeId() {
+        return srcNodeId;
+    }
+
+    /**
+     * // TODO started and start methods can be merged.

Review comment:
       TODO should be with the link to the ticket
   Method name is strange (taking into account return value) and it strange to have it in parent class.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotHandler extends GridFutureAdapter<Void> implements Runnable {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotHandler(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotHandler.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotHandler active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotHandler> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling. May produce <tt>null</tt> value if the queue is empty.
+         */
+        public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+            RemoteSnapshotHandler curr = active;
+
+            if (curr == null || curr.isDone()) {
+                if (next == null)
+                    return;
+
+                next.listen(f -> submit(queue.poll()));
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.run();

Review comment:
       Runnable is not a good interface here, since someone might think that the future completes when the `run()` method completes. Actually `run()` method just initializes the task.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+                    opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + ']');
+            }
+
+            CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" + opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>())
+                                .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            // This is necessary for sending only one partitions request per each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = snapshotAffinity(opCtx0.metasPerNode,
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).contains(new PartitionRestoreFuture(partId)));
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + ']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : snpAff.entrySet()) {
+                    if (m.getKey().equals(ctx.localNodeId()))
+                        continue;
+
+                    ctx.cache().context().snapshotMgr()
+                        .requestRemoteSnapshot(m.getKey(),
+                            opCtx0.snpName,
+                            m.getValue(),
+                            opCtx0.stopChecker,
+                            (snpFile, t) -> {
+                                if (opCtx0.stopChecker.getAsBoolean())
+                                    throw new TransmissionCancelledException("Snapshot remote operation request cancelled.");

Review comment:
       This type of exception here looks strange. Method `preload` knows nothing about the internals of `requestRemoteSnapshot`, but looks like this exception is required by some workflow in `requestRemoteSnapshot`. 

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** */
+    private static class RestoreCacheStartException extends IgniteCheckedException {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** @param cause Error. */
+        public RestoreCacheStartException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    /** Snapshot operation prepare response. */
+    private static class SnapshotRestoreOperationResponse implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache configurations on local node. */
+        private ArrayList<StoredCacheData> ccfgs;

Review comment:
       `ArrayList` -> `List`, final

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -315,28 +327,44 @@ protected void cleanup() throws IgniteCheckedException {
     }
 
     /**
-     * Check if the cache or group with the specified name is currently being restored from the snapshot.
-     *
-     * @param cacheName Cache name.
-     * @param grpName Cache group name.
+     * @param opCtx Restoring context.
+     * @return The request id of restoring snapshot operation.
+     */
+    private @Nullable UUID restoringId(@Nullable SnapshotRestoreContext opCtx) {

Review comment:
       Not used

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1440,13 +1504,46 @@ public static boolean isSnapshotOperation(DiscoveryEvent evt) {
         }
     }
 
+    /**
+     * @param parts Collection of pairs group and appropriate cache partition to be snapshot.
+     * @param rmtNodeId The remote node to connect to.
+     * @param partHnd Received partition handler.
+     */
+    public IgniteInternalFuture<Void> requestRemoteSnapshot(

Review comment:
       I don't like naming. This method does not request to make a remote snapshot, it requests remote snapshot files (already existing). RemoteSnapshotHandler does not handle remote snapshots, it receives remote snapshot files. SnapshotRequestMessage does not request to make a snapshot, it requests already existing files. SnapshotResponseMessage send only in case of error when sending files, it's better to have this information in the name (SnapshotFilesFailureResponse for example). Etc.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3925,16 +3925,16 @@ private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndRe
 
                 if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) {
                     if (exchActions != null) {
-                        Set<String> caches = exchActions.cachesToResetLostPartitions();
+                    Set<String> caches = exchActions.cachesToResetLostPartitions();

Review comment:
       Wrong indent. Nothing changed in this class except indents.

##########
File path: modules/core/src/test/java/org/apache/ignite/platform/PlatformAddArgEntryProcessorBinarizable.java
##########
@@ -19,7 +19,6 @@
 
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
-

Review comment:
       Not related to this fix

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1579,52 +1676,41 @@ SnapshotFutureTask registerSnapshotTask(
         boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
+        return (SnapshotFutureTask)registerSnapshotTask(snpName, new SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,

Review comment:
       Method returns `SnapshotFinishedFutureTask` in some cases, but result force casted to `SnapshotFutureTask`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1579,52 +1676,41 @@ SnapshotFutureTask registerSnapshotTask(
         boolean withMetaStorage,
         SnapshotSender snpSndr
     ) {
+        return (SnapshotFutureTask)registerSnapshotTask(snpName, new SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,
+            ioFactory, snpSndr, parts, withMetaStorage, locBuff));
+    }
+
+    /**
+     * @param task Snapshot operation task to be executed.
+     * @return Snapshot operation task which should be registered on checkpoint to run.
+     */
+    private AbstractSnapshotFutureTask<?> registerSnapshotTask(String rqId, AbstractSnapshotFutureTask<?> task) {
         if (!busyLock.enterBusy()) {
-            return new SnapshotFutureTask(
-                new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" + cctx.localNodeId() + ']'));
+            return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+                cctx.localNodeId() + ']'));
         }
 
         try {
-            if (locSnpTasks.containsKey(snpName))
-                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
-
-            SnapshotFutureTask snpFutTask;
+            if (locSnpTasks.containsKey(rqId)) {
+                return new SnapshotFinishedFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+                    rqId));
+            }
 
-            SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
-                snpFutTask = new SnapshotFutureTask(cctx,
-                    srcNodeId,
-                    snpName,
-                    tmpWorkDir,
-                    ioFactory,
-                    snpSndr,
-                    parts,
-                    withMetaStorage,
-                    locBuff));
+            AbstractSnapshotFutureTask<?> prev = locSnpTasks.putIfAbsent(rqId, task);
 
             if (prev != null)
-                return new SnapshotFutureTask(new IgniteCheckedException("Snapshot with requested name is already scheduled: " + snpName));
-
-            if (!withMetaStorage) {

Review comment:
       Why this check was removed?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotHandler extends GridFutureAdapter<Void> implements Runnable {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotHandler(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotHandler.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotHandler active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotHandler> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling. May produce <tt>null</tt> value if the queue is empty.
+         */
+        public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+            RemoteSnapshotHandler curr = active;
+
+            if (curr == null || curr.isDone()) {
+                if (next == null)
+                    return;
+
+                next.listen(f -> submit(queue.poll()));

Review comment:
       The race is still here, when task is done it first poll the queue (get null result) and then pass it to sync submit method, at the same time another thread can be inside sync submit method before offer next tast to the queue.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+                    opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + ']');
+            }
+
+            CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" + opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>())
+                                .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            // This is necessary for sending only one partitions request per each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = snapshotAffinity(opCtx0.metasPerNode,
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).contains(new PartitionRestoreFuture(partId)));
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + ']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : snpAff.entrySet()) {
+                    if (m.getKey().equals(ctx.localNodeId()))
+                        continue;

Review comment:
       Is it possible? Looks like only remote nodes should be in snpAff

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID, ArrayList<StoredCacheData>> res
         }
 
         if (failure == null)
-            failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+            failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
 
         // Context has been created - should rollback changes cluster-wide.
         if (failure != null) {
-            opCtx0.err.compareAndSet(null, failure);
-
-            if (U.isLocalNodeCoordinator(ctx.discovery()))
-                rollbackRestoreProc.start(reqId, reqId);
+            opCtx0.errHnd.accept(failure);
 
             return;
         }
 
         Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
 
-        for (List<StoredCacheData> storedCfgs : res.values()) {
-            if (storedCfgs == null)
-                continue;
+        for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e : res.entrySet()) {
+            if (e.getValue().ccfgs != null) {
+                for (StoredCacheData cacheData : e.getValue().ccfgs) {
+                    globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
 
-            for (StoredCacheData cacheData : storedCfgs)
-                globalCfgs.put(CU.cacheId(cacheData.config().getName()), cacheData);
+                    opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+                }
+            }
+
+            opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new ArrayList<>())
+                .addAll(e.getValue().metas);
         }
 
         opCtx0.cfgs = globalCfgs;
 
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            preloadProc.start(reqId, reqId);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param cache Discovery cache.
+     * @return Map of affinity per each cache group.
+     */
+    private static GridAffinityAssignmentCache calculateAffinity(
+        GridKernalContext ctx,
+        CacheConfiguration<?, ?> ccfg,
+        DiscoCache cache
+    ) {
+        GridAffinityAssignmentCache affCache = GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+        affCache.calculate(cache.version(), null, cache);
+
+        return affCache;
+    }
+
+    /**
+     * @param metas List of snapshot metadata to check.
+     * @param grpId Group id.
+     * @param parts Set of partitions to search for.
+     * @return Snapshot metadata which contains a full set of given partitions or {@code null} the otherwise.
+     */
+    private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+        List<SnapshotMetadata> metas,
+        int grpId,
+        Set<Integer> parts
+    ) {
+        assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+        // Try to copy everything right from the single snapshot part.
+        for (SnapshotMetadata meta : metas) {
+            Set<Integer> grpParts = meta.partitions().get(grpId);
+            Set<Integer> grpWoIndex = grpParts == null ? Collections.emptySet() : new HashSet<>(grpParts);
+
+            grpWoIndex.remove(INDEX_PARTITION);
+
+            if (grpWoIndex.equals(parts))
+                return meta;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param reqId Request id.
+     * @return Future which will be completed when the preload ends.
+     */
+    private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+        GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+        if (opCtx0 == null)
+            return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot restore process has incorrect restore state: " + reqId));
+
+        if (opCtx0.dirs.isEmpty())
+            return new GridFinishedFuture<>();
+
+        try {
+            if (ctx.isStopping())
+                throw new NodeStoppingException("Node is stopping: " + ctx.localNodeId());
+
+            IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+            synchronized (this) {
+                opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+            }
+
+            if (log.isInfoEnabled()) {
+                log.info("Starting snapshot preload operation to restore cache groups" +
+                    "[snapshot=" + opCtx0.snpName +
+                    ", caches=" + F.transform(opCtx0.dirs, File::getName) + ']');
+            }
+
+            CompletableFuture<Void> metaFut = ctx.localNodeId().equals(opCtx0.opNodeId) ?
+                CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            SnapshotMetadata meta = F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+                            File binDir = binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+                                meta.folderName());
+
+                            ctx.cacheObjects().updateMetadata(binDir, opCtx0.stopChecker);
+                        }
+                        catch (Throwable t) {
+                            log.error("Unable to perform metadata update operation for the cache groups restore process", t);
+
+                            opCtx0.errHnd.accept(t);
+                        }
+                    }, snpMgr.snapshotExecutorService()) : CompletableFuture.completedFuture(null);
+
+            for (StoredCacheData data : opCtx0.cfgs.values()) {
+                opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+                    grp -> calculateAffinity(ctx, data.config(), opCtx0.discoCache));
+            }
+
+            // First preload everything from the local node.
+            List<SnapshotMetadata> locMetas = opCtx0.metasPerNode.get(ctx.localNodeId());
+
+            Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new HashMap<>();
+            ClusterNode locNode = ctx.cache().context().localNode();
+
+            for (File dir : opCtx0.dirs) {
+                String cacheOrGrpName = cacheGroupName(dir);
+                int grpId = CU.cacheId(cacheOrGrpName);
+
+                File tmpCacheDir = formatTmpDirName(dir);
+                tmpCacheDir.mkdir();
+
+                Set<PartitionRestoreFuture> leftParts;
+
+                opCtx0.locProgress.put(grpId,
+                    nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode, PartitionRestoreFuture::new));
+
+                rmtLoadParts.put(grpId, leftParts = new HashSet<>(opCtx0.locProgress.get(grpId)));
+
+                if (leftParts.isEmpty())
+                    continue;
+
+                SnapshotMetadata full = findMetadataWithSamePartitions(locMetas,
+                    grpId,
+                    leftParts.stream().map(p -> p.partId).collect(Collectors.toSet()));
+
+                for (SnapshotMetadata meta : full == null ? locMetas : Collections.singleton(full)) {
+                    if (leftParts.isEmpty())
+                        break;
+
+                    File snpCacheDir = new File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+                        Paths.get(databaseRelativePath(meta.folderName()), dir.getName()).toString());
+
+                    leftParts.removeIf(partFut -> {
+                        boolean doCopy = ofNullable(meta.partitions().get(grpId))
+                            .orElse(Collections.emptySet())
+                            .contains(partFut.partId);
+
+                        if (doCopy) {
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                partFut);
+                        }
+
+                        return doCopy;
+                    });
+
+                    if (meta == full) {
+                        assert leftParts.isEmpty() : leftParts;
+
+                        if (log.isInfoEnabled()) {
+                            log.info("The snapshot was taken on the same cluster topology. The index will be copied to " +
+                                "restoring cache group if necessary [snpName=" + opCtx0.snpName + ", dir=" + dir.getName() + ']');
+                        }
+
+                        File idxFile = new File(snpCacheDir, FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+                        if (idxFile.exists()) {
+                            PartitionRestoreFuture idxFut;
+
+                            opCtx0.locProgress.computeIfAbsent(grpId, g -> new HashSet<>())
+                                .add(idxFut = new PartitionRestoreFuture(INDEX_PARTITION));
+
+                            copyLocalAsync(ctx.cache().context().snapshotMgr(),
+                                opCtx,
+                                snpCacheDir,
+                                tmpCacheDir,
+                                idxFut);
+                        }
+                    }
+                }
+            }
+
+            // Load other partitions from remote nodes.
+            // This is necessary for sending only one partitions request per each cluster node.
+            Map<UUID, Map<Integer, Set<Integer>>> snpAff = snapshotAffinity(opCtx0.metasPerNode,
+                (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+                    rmtLoadParts.get(grpId).contains(new PartitionRestoreFuture(partId)));
+            Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+                .collect(Collectors.toMap(d -> CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+                    d -> d));
+
+            try {
+                if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+                    log.info("Trying to request partitions from remote nodes" +
+                        "[snapshot=" + opCtx0.snpName +
+                        ", map=" + snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> partitionsMapToCompactString(e.getValue()))) + ']');
+                }
+
+                for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m : snpAff.entrySet()) {

Review comment:
       As far as I understand snpAff contains required by current node remote partitions. But here can be duplicates (two nodes can contain the same partitions), and looks like if two nodes contain the same partitions we request it from both nodes.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -990,15 +1369,43 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
         /** Snapshot name. */
         private final String snpName;
 
-        /** Baseline node IDs that must be alive to complete the operation. */
-        private final Set<UUID> nodes;
+        /** Baseline discovery cache for node IDs that must be alive to complete the operation.*/
+        private final DiscoCache discoCache;
 
-        /** List of restored cache group directories. */
-        private final Collection<File> dirs;
+        /** Operational node id. */
+        private final UUID opNodeId;
+
+        /**
+         * Set of restored cache groups path on local node. Collected when all cache configurations received
+         * from the <tt>prepare</tt> distributed process.
+         */
+        private final Set<File> dirs = new HashSet<>();
 
         /** The exception that led to the interruption of the process. */
         private final AtomicReference<Throwable> err = new AtomicReference<>();
 
+        /** Distribution of snapshot metadata files across the cluster. */
+        private final Map<UUID, ArrayList<SnapshotMetadata>> metasPerNode = new HashMap<>();

Review comment:
       `ArrayList` -> `List`?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -872,12 +1118,49 @@ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exce
             rollbackRestoreProc.start(reqId, reqId);
     }
 
+    /**
+     * @param metas Map of snapshot metadata distribution across the cluster.
+     * @return Map of cache partitions per each node.
+     */
+    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+        Map<UUID, ArrayList<SnapshotMetadata>> metas,
+        BiPredicate<Integer, Integer> filter
+    ) {
+        Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+        for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e : metas.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(new ArrayList<>())) {

Review comment:
       `new ArrayList<>()` -> `Collections.emptyList()`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId, ByteBuffer buff) throws IgniteChe
         }
     }
 
+    /** Remote snapshot future which tracks remote snapshot transmission result. */
+    private static class RemoteSnapshotHandler extends GridFutureAdapter<Void> implements Runnable {
+        /** Snapshot name to create. */
+        private final String reqId = RMT_SNAPSHOT_PREFIX + U.maskForFileName(UUID.randomUUID().toString());
+
+        /** Ignite snapshot manager. */
+        private final IgniteSnapshotManager snpMgr;
+
+        /** Initial message to send request. */
+        private final SnapshotRequestMessage initMsg;
+
+        /** Remote node id to request snapshot from. */
+        private final UUID rmtNodeId;
+
+        /** Process interrupt checker. */
+        private final BooleanSupplier stopChecker;
+
+        /** Partition handler given by request initiator. */
+        private final BiConsumer<File, Throwable> partHnd;
+
+        /** Temporary working directory for consuming partitions. */
+        private final Path dir;
+
+        /** Counter which show how many partitions left to be received. */
+        private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+        /**
+         * @param snpMgr Ignite snapshot manager.
+         * @param rmtNodeId Remote node to request snapshot from.
+         * @param snpName Snapshot name to request.
+         * @param parts Cache group and partitions to request.
+         * @param stopChecker Process interrupt checker.
+         * @param partHnd Partition handler.
+         */
+        public RemoteSnapshotHandler(
+            IgniteSnapshotManager snpMgr,
+            UUID rmtNodeId,
+            String snpName,
+            Map<Integer, Set<Integer>> parts,
+            BooleanSupplier stopChecker,
+            BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+        ) {
+            dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+            initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+            this.snpMgr = snpMgr;
+            this.rmtNodeId = rmtNodeId;
+            this.stopChecker = stopChecker;
+            this.partHnd = partHnd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void run() {
+            if (isDone())
+                return;
+
+            try {
+                ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+                if (rmtNode == null) {
+                    throw new ClusterTopologyCheckedException("Snapshot remote request cannot be performed. " +
+                        "Remote node left the grid [rmtNodeId=" + rmtNodeId + ']');
+                }
+
+                snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+                    DFLT_INITIAL_SNAPSHOT_TOPIC,
+                    initMsg,
+                    SYSTEM_POOL,
+                    Long.MAX_VALUE,
+                    true);
+
+                if (snpMgr.log.isInfoEnabled()) {
+                    snpMgr.log.info("Snapshot request is sent to the remote node [rmtNodeId=" + rmtNodeId +
+                        ", snpName=" + initMsg.snapshotName() + ", rqId=" + reqId + ']');
+                }
+            }
+            catch (Throwable t) {
+                onDone(t);
+            }
+        }
+
+        /**
+         * @param ex Exception occurred during receiving files.
+         */
+        public synchronized void acceptException(Throwable ex) {
+            if (isDone())
+                return;
+
+            try {
+                partHnd.accept(null, ex);
+            }
+            catch (Throwable t) {
+                ex.addSuppressed(t);
+            }
+
+            onDone(ex);
+        }
+
+        /**
+         * @param part Received file which needs to be handled.
+         */
+        public synchronized void acceptFile(File part) {
+            if (isDone())
+                return;
+
+            if (stopChecker.getAsBoolean())
+                throw new TransmissionCancelledException("Future cancelled prior to the all requested partitions processed.");
+
+            partHnd.accept(part, null);
+            partsLeft.decrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected synchronized boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
+            U.delete(dir);
+
+            return super.onDone(res, err, cancel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+            return Objects.equals(reqId, future.reqId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return reqId.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteSnapshotHandler.class, this);
+        }
+    }
+
+    /**
+     * This manager is responsible for requesting and handling snapshots from a remote node. Each snapshot request
+     * processed asynchronously but strictly one by one.
+     */
+    private class SequentialRemoteSnapshotManager implements TransmissionHandler, GridMessageListener {
+        /** A task currently being executed and must be explicitly finished. */
+        private volatile RemoteSnapshotHandler active;
+
+        /** Queue of asynchronous tasks to execute. */
+        private final Queue<RemoteSnapshotHandler> queue = new ConcurrentLinkedDeque<>();
+
+        /** {@code true} if the node is stopping. */
+        private volatile boolean stopping;
+
+        /**
+         * @param next New task for scheduling. May produce <tt>null</tt> value if the queue is empty.
+         */
+        public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+            RemoteSnapshotHandler curr = active;
+
+            if (curr == null || curr.isDone()) {
+                if (next == null)
+                    return;
+
+                next.listen(f -> submit(queue.poll()));
+
+                active = next;
+
+                if (stopping)
+                    next.acceptException(new IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+                else
+                    next.run();
+            }
+            else if (next != null)
+                queue.offer(next);
+        }
+
+        /** Stopping handler. */
+        public void stop() {
+            stopping = true;
+
+            Set<RemoteSnapshotHandler> futs = activeTasks();
+            GridCompoundFuture<Void, Void> stopFut = new GridCompoundFuture<>();
+
+            try {
+                for (IgniteInternalFuture<Void> fut : futs)
+                    stopFut.add(fut);
+
+                stopFut.markInitialized().get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @param nodeId A node left the cluster.
+         */
+        public void onNodeLeft(UUID nodeId) {
+            Set<RemoteSnapshotHandler> futs = activeTasks();
+            ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("The node from which a snapshot has been " +
+                "requested left the grid");
+
+            futs.forEach(t -> {
+                if (t.rmtNodeId.equals(nodeId))
+                    t.acceptException(ex);
+            });
+        }
+
+        /**
+         * @return The set of currently scheduled tasks, some of them may be already completed.
+         */
+        private Set<RemoteSnapshotHandler> activeTasks() {
+            Set<RemoteSnapshotHandler> futs = new HashSet<>();
+
+            RemoteSnapshotHandler curr = active;
+            RemoteSnapshotHandler changed;
+
+            do {
+                futs.addAll(queue);
+                futs.add(curr);
+
+                changed = curr;
+            } while ((curr = active) != changed);
+
+            return futs.stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (msg instanceof SnapshotRequestMessage) {
+                    SnapshotRequestMessage reqMsg0 = (SnapshotRequestMessage)msg;
+                    String rqId = reqMsg0.requestId();
+                    String snpName = reqMsg0.snapshotName();
+
+                    synchronized (this) {
+                        AbstractSnapshotFutureTask<?> task = lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+                        if (task != null) {
+                            // Task will also be removed from local map due to the listener on future done.
+                            task.cancel();
+
+                            log.info("Snapshot request has been cancelled due to another request received " +
+                                "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']');
+                        }
+                    }
+
+                    AbstractSnapshotFutureTask<?> task = registerSnapshotTask(rqId,
+                        new SnapshotResponseRemoteFutureTask(cctx,
+                            nodeId,
+                            snpName,
+                            tmpWorkDir,
+                            ioFactory,
+                            rmtSndrFactory.apply(rqId, nodeId),
+                            reqMsg0.parts()));
+
+                    task.listen(f -> {
+                        if (f.error() == null)
+                            return;
+
+                        U.error(log, "Failed to process request of creating a snapshot " +
+                            "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error());
+
+                        try {
+                            cctx.gridIO().sendToCustomTopic(nodeId,
+                                DFLT_INITIAL_SNAPSHOT_TOPIC,
+                                new SnapshotResponseMessage(reqMsg0.requestId(), f.error().getMessage()),
+                                SYSTEM_POOL);
+                        }
+                        catch (IgniteCheckedException ex0) {
+                            U.error(log, "Fail to send the response message with processing snapshot request " +
+                                "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0);
+                        }
+                    });
+
+                    task.start();
+                }
+                else if (msg instanceof SnapshotResponseMessage) {
+                    SnapshotResponseMessage respMsg0 = (SnapshotResponseMessage)msg;
+
+                    RemoteSnapshotHandler task = active;
+
+                    if (task == null || !task.reqId.equals(respMsg0.requestId())) {
+                        if (log.isInfoEnabled()) {
+                            log.info("A stale snapshot response message has been received. Will be ignored " +
+                                "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']');
+                        }
+
+                        return;
+                    }
+
+                    if (respMsg0.errorMessage() != null) {
+                        task.acceptException(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+                            "on the remote node with an error: " + respMsg0.errorMessage()));
+                    }
+                }
+            }
+            catch (Throwable e) {
+                U.error(log, "Processing snapshot request from remote node fails with an error", e);
+
+                cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEnd(UUID nodeId) {
+            RemoteSnapshotHandler task = active;
+
+            if (task == null)
+                return;
+
+            assert task.partsLeft.get() == 0 : task;
+            assert task.rmtNodeId.equals(nodeId);
+
+            log.info("Requested snapshot from remote node has been fully received " +

Review comment:
       isInfoEnabled

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -907,8 +1190,9 @@ private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
 
         synchronized (this) {
             opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+        }
 
-            try {
+        try {
                 ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {

Review comment:
       Wrong indent

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** */
+    private static class RestoreCacheStartException extends IgniteCheckedException {

Review comment:
       Not used

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -980,6 +1263,102 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
         finishProcess(reqId, opCtx0.err.get());
     }
 
+    /**
+     * @param mgr Ignite snapshot manager.
+     * @param opCtx Snapshot operation context.
+     * @param srcDir Snapshot directory to copy from.
+     * @param targetDir Destination directory to copy to.
+     */
+    private static void copyLocalAsync(
+        IgniteSnapshotManager mgr,
+        SnapshotRestoreContext opCtx,
+        File srcDir,
+        File targetDir,
+        PartitionRestoreFuture partFut
+    ) {
+        File snpFile = new File(srcDir, FilePageStoreManager.getPartitionFileName(partFut.partId));
+        Path partFile = Paths.get(targetDir.getAbsolutePath(), FilePageStoreManager.getPartitionFileName(partFut.partId));
+
+        CompletableFuture.supplyAsync(() -> {
+                if (opCtx.stopChecker.getAsBoolean())
+                    throw new IgniteInterruptedException("The operation has been stopped on copy file: " + snpFile.getAbsolutePath());
+
+                if (Thread.interrupted())
+                    throw new IgniteInterruptedException("Thread has been interrupted: " + Thread.currentThread().getName());
+
+                if (!snpFile.exists()) {
+                    throw new IgniteException("Partition snapshot file doesn't exist [snpName=" + opCtx.snpName +
+                        ", snpDir=" + snpFile.getAbsolutePath() + ", name=" + snpFile.getName() + ']');
+                }
+
+                IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile, partFile.toFile(), snpFile.length());
+
+                return partFile;
+            }, mgr.snapshotExecutorService())
+            .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+            .whenComplete((r, t) -> {
+                if (t == null)
+                    partFut.complete(partFile);
+                else
+                    partFut.completeExceptionally(t);
+            });
+    }
+
+    /**
+     * @param affCache Affinity cache.
+     * @param node Cluster node to get assigned partitions.
+     * @return The set of partitions assigned to the given node.
+     */
+    private static <T> Set<T> nodeAffinityPartitions(
+        GridAffinityAssignmentCache affCache,
+        ClusterNode node,
+        IntFunction<T> factory
+    ) {
+        return IntStream.range(0, affCache.partitions())
+            .filter(p -> affCache.idealAssignment().assignment().get(p).contains(node))
+            .mapToObj(factory)
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * @param col Collection of sets to complete.
+     * @param ex Exception to set.
+     */
+    private static void completeListExceptionally(Collection<Set<PartitionRestoreFuture>> col, Throwable ex) {
+        col.stream()
+            .flatMap(Collection::stream)
+            .forEach(f -> f.completeExceptionally(ex));
+    }
+
+    /**
+     * @param cacheStartFut The cache started future to wrap exception if need.
+     * @param <T> Result future type.
+     * @return Future which completes with wrapped exception if it occurred.
+     */
+    private static <T> IgniteInternalFuture<T> chainCacheStartException(IgniteInternalFuture<T> cacheStartFut) {

Review comment:
       Not used

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Except
 
         /**
          * @param req Request to prepare cache group restore from the snapshot.
-         * @param dirs List of cache group names to restore from the snapshot.
          * @param cfgs Cache ID to configuration mapping.
          */
-        protected SnapshotRestoreContext(SnapshotOperationRequest req, Collection<File> dirs,
-            Map<Integer, StoredCacheData> cfgs) {
+        protected SnapshotRestoreContext(
+            SnapshotOperationRequest req,
+            DiscoCache discoCache,
+            Map<Integer, StoredCacheData> cfgs,
+            UUID locNodeId,
+            List<SnapshotMetadata> locMetas
+        ) {
             reqId = req.requestId();
             snpName = req.snapshotName();
-            nodes = new HashSet<>(req.nodes());
+            opNodeId = req.operationalNodeId();
+            this.discoCache = discoCache;
 
-            this.dirs = dirs;
             this.cfgs = cfgs;
+
+            metasPerNode.computeIfAbsent(locNodeId, id -> new ArrayList<>()).addAll(locMetas);
+        }
+
+        /**
+         * @return Required baseline nodeIds that must be alive to complete restore operation.
+         */
+        public Collection<UUID> nodes() {
+            return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+        }
+    }
+
+    /** */
+    private static class RestoreCacheStartException extends IgniteCheckedException {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** @param cause Error. */
+        public RestoreCacheStartException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    /** Snapshot operation prepare response. */
+    private static class SnapshotRestoreOperationResponse implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache configurations on local node. */
+        private ArrayList<StoredCacheData> ccfgs;
+
+        /** Snapshot metadata files on local node. */
+        private ArrayList<SnapshotMetadata> metas;

Review comment:
       `ArrayList` -> `List`, final




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] Mmuzaf commented on a change in pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
Mmuzaf commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r752632349



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3925,16 +3925,16 @@ private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndRe
 
                 if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) {
                     if (exchActions != null) {
-                        Set<String> caches = exchActions.cachesToResetLostPartitions();
+                    Set<String> caches = exchActions.cachesToResetLostPartitions();

Review comment:
       I don know why these changes are here, but there is no difference for this file in `ignite-14744-b` and `master` branches.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9539: IGNITE-14744 Restore snapshot taken on different topologies

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r751393929



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -872,12 +1115,49 @@ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exce
             rollbackRestoreProc.start(reqId, reqId);
     }
 
+    /**
+     * @param metas Map of snapshot metadata distribution across the cluster.
+     * @return Map of cache partitions per each node.
+     */
+    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+        Map<UUID, List<SnapshotMetadata>> metas,
+        BiPredicate<Integer, Integer> filter
+    ) {
+        Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+        for (Map.Entry<UUID, List<SnapshotMetadata>> e : metas.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(new ArrayList<>())) {
+                Map<Integer, Set<Integer>> parts = ofNullable(meta.partitions()).orElse(Collections.emptyMap());
+
+                for (Map.Entry<Integer, Set<Integer>> metaParts : parts.entrySet()) {
+                    for (Integer partId : metaParts.getValue()) {
+                        if (filter.test(metaParts.getKey(), partId)) {
+                            nodeToSnp.computeIfAbsent(nodeId, n -> new HashMap<>())
+                                .computeIfAbsent(metaParts.getKey(), k -> new HashSet<>())
+                                .add(partId);
+                        }
+                    }
+                }
+            }
+        }
+
+        List<UUID> list = new ArrayList<>(nodeToSnp.keySet());
+        Collections.shuffle(list);

Review comment:
       Shuffle here has no effect since all required partitions are already fetched from the first nodes in `metas`. Due to this load is not evenly balanced.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -872,12 +1118,49 @@ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res, Map<UUID, Exce
             rollbackRestoreProc.start(reqId, reqId);
     }
 
+    /**
+     * @param metas Map of snapshot metadata distribution across the cluster.
+     * @return Map of cache partitions per each node.
+     */
+    private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+        Map<UUID, ArrayList<SnapshotMetadata>> metas,
+        BiPredicate<Integer, Integer> filter
+    ) {
+        Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+        for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e : metas.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(new ArrayList<>())) {

Review comment:
       No, it's not generalized by `Object` and can be used here without problems




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org