You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/10/14 10:15:09 UTC

[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17613 Create incremental snapshot infrastructure (#10263)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
     new 914dc24b242 IGNITE-17613 Create incremental snapshot infrastructure (#10263)
914dc24b242 is described below

commit 914dc24b2423d93c2cc9dc5d235b2449e51a15e2
Author: Nikolay <ni...@apache.org>
AuthorDate: Fri Oct 14 13:14:57 2022 +0300

    IGNITE-17613 Create incremental snapshot infrastructure (#10263)
    
    Co-authored-by: Maksim Timonin <ti...@gmail.com>
---
 .../commandline/indexreader/IgniteIndexReader.java |  13 +-
 .../snapshot/SnapshotCreateCommand.java            |  12 +-
 .../snapshot/SnapshotCreateCommandOption.java      |   6 +-
 .../commandline/snapshot/SnapshotSubcommand.java   |   5 +-
 .../apache/ignite/util/GridCommandHandlerTest.java |   2 +-
 .../java/org/apache/ignite/IgniteSnapshot.java     |  10 +-
 .../org/apache/ignite/internal/cdc/CdcMain.java    |   4 +-
 .../processors/cache/GridLocalConfigManager.java   |  37 +-
 .../persistence/file/FilePageStoreManager.java     |   8 +
 .../snapshot/AbstractSnapshotFutureTask.java       |   7 +
 .../snapshot/IgniteSnapshotManager.java            | 446 ++++++++++++++++++---
 .../snapshot/IncrementalSnapshotFutureTask.java    | 125 ++++++
 .../IncrementalSnapshotFutureTaskResult.java       |  22 +
 .../snapshot/IncrementalSnapshotMetadata.java      |  82 ++++
 .../persistence/snapshot/SnapshotFutureTask.java   |   7 -
 .../persistence/snapshot/SnapshotMXBeanImpl.java   |  12 +-
 .../persistence/snapshot/SnapshotMetadata.java     |   3 +-
 .../snapshot/SnapshotOperationRequest.java         |  24 +-
 .../snapshot/SnapshotRestoreProcess.java           |  10 +-
 .../cache/persistence/wal/FileDescriptor.java      |  18 +-
 .../persistence/wal/FileWriteAheadLogManager.java  |   8 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |  47 +++
 .../apache/ignite/internal/util/lang/GridFunc.java |   4 +-
 .../visor/snapshot/VisorSnapshotCreateTask.java    |   7 +-
 .../visor/snapshot/VisorSnapshotCreateTaskArg.java |  14 +-
 .../snapshot/VisorSnapshotRestoreTaskArg.java      |   2 +-
 .../org/apache/ignite/mxbean/SnapshotMXBean.java   |  15 +
 .../snapshot/AbstractSnapshotSelfTest.java         |  10 +
 .../snapshot/IgniteClusterSnapshotHandlerTest.java |   2 +-
 .../IgniteClusterSnapshotRestoreSelfTest.java      |   4 +-
 .../snapshot/IgniteClusterSnapshotSelfTest.java    |  51 ++-
 .../snapshot/IgniteSnapshotMXBeanTest.java         |   9 +
 .../snapshot/IncrementalSnapshotTest.java          | 264 ++++++++++++
 .../internal/util/lang/GridFuncSelfTest.java       |  54 +++
 .../ignite/testsuites/IgniteSnapshotTestSuite.java |   4 +-
 .../ignite/testsuites/IgniteUtilSelfTestSuite.java |   2 +
 ...ridCommandHandlerClusterByClassTest_help.output |  13 +-
 ...andHandlerClusterByClassWithSSLTest_help.output |  13 +-
 38 files changed, 1233 insertions(+), 143 deletions(-)

diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java
index 1842266680a..f0dcf4510fc 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/indexreader/IgniteIndexReader.java
@@ -18,10 +18,7 @@
 package org.apache.ignite.internal.commandline.indexreader;
 
 import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -62,6 +59,7 @@ import org.apache.ignite.internal.commandline.indexreader.ScanContext.PagesStati
 import org.apache.ignite.internal.commandline.systemview.SystemViewCommand;
 import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
 import org.apache.ignite.internal.processors.cache.persistence.IndexStorageImpl;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
@@ -118,7 +116,6 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.pageIndex;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.partId;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
 import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
@@ -285,13 +282,13 @@ public class IgniteIndexReader implements AutoCloseable {
         for (int i = 0; i < partCnt; i++)
             partStores[i] = filePageStore(i, FLAG_DATA, storeFactory);
 
-        Arrays.stream(root.listFiles(f -> f.getName().endsWith(CACHE_DATA_FILENAME))).forEach(f -> {
-            try (ObjectInputStream stream = new ObjectInputStream(Files.newInputStream(f.toPath()))) {
-                StoredCacheData data = (StoredCacheData)stream.readObject();
+        Arrays.stream(FilePageStoreManager.cacheDataFiles(root)).forEach(f -> {
+            try {
+                StoredCacheData data = GridLocalConfigManager.readCacheData(f, null, null);
 
                 storedCacheData.put(CU.cacheId(data.config().getName()), data);
             }
-            catch (ClassNotFoundException | IOException e) {
+            catch (IgniteCheckedException e) {
                 log.log(WARNING, "Can't read stored cache data. Inline for this cache will not be analyzed [f=" + f.getName() + ']', e);
             }
         });
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java
index 7582bd3fcfb..f346beebe75 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommand.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.visor.snapshot.VisorSnapshotCreateTaskArg;
 import static org.apache.ignite.internal.commandline.CommandList.SNAPSHOT;
 import static org.apache.ignite.internal.commandline.CommandLogger.optional;
 import static org.apache.ignite.internal.commandline.snapshot.SnapshotCreateCommandOption.DESTINATION;
+import static org.apache.ignite.internal.commandline.snapshot.SnapshotCreateCommandOption.INCREMENTAL;
 import static org.apache.ignite.internal.commandline.snapshot.SnapshotCreateCommandOption.SYNC;
 
 /**
@@ -45,6 +46,7 @@ public class SnapshotCreateCommand extends SnapshotSubcommand {
         String snpName = argIter.nextArg("Expected snapshot name.");
         String snpPath = null;
         boolean sync = false;
+        boolean incremental = false;
 
         while (argIter.hasNextSubArg()) {
             String arg = argIter.nextArg(null);
@@ -72,10 +74,15 @@ public class SnapshotCreateCommand extends SnapshotSubcommand {
 
                 sync = true;
             }
+            else if (option == INCREMENTAL) {
+                if (incremental)
+                    throw new IllegalArgumentException(INCREMENTAL.argName() + " arg specified twice.");
 
+                incremental = true;
+            }
         }
 
-        cmdArg = new VisorSnapshotCreateTaskArg(snpName, snpPath, sync);
+        cmdArg = new VisorSnapshotCreateTaskArg(snpName, snpPath, sync, incremental);
     }
 
     /** {@inheritDoc} */
@@ -84,8 +91,9 @@ public class SnapshotCreateCommand extends SnapshotSubcommand {
 
         params.put(DESTINATION.argName() + " " + DESTINATION.arg(), DESTINATION.description());
         params.put(SYNC.argName(), SYNC.description());
+        params.put(INCREMENTAL.argName(), INCREMENTAL.description());
 
         usage(log, "Create cluster snapshot:", SNAPSHOT, params, name(), SNAPSHOT_NAME_ARG,
-            optional(DESTINATION.argName(), DESTINATION.arg()), optional(SYNC.argName()));
+            optional(DESTINATION.argName(), DESTINATION.arg()), optional(SYNC.argName()), optional(INCREMENTAL.argName()));
     }
 }
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java
index a6d216d1781..23761a0e5a2 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotCreateCommandOption.java
@@ -30,7 +30,11 @@ public enum SnapshotCreateCommandOption implements CommandArg {
 
     /** Snapshot directory path. */
     DESTINATION("--dest", "path", "Path to the directory where the snapshot will be saved. If not specified, " +
-        "the default configured snapshot directory will be used.");
+        "the default configured snapshot directory will be used."),
+
+    /** Incremental snapshot flag. */
+    INCREMENTAL("--incremental", null, "Create an incremental snapshot for previously created full snapshot. " +
+        "Full snapshot must be accessible via --dest and snapshot_name.");
 
     /** Name. */
     private final String name;
diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java
index 44f9e7d73b0..2d5b3c331fe 100644
--- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java
+++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/snapshot/SnapshotSubcommand.java
@@ -87,7 +87,10 @@ public abstract class SnapshotSubcommand extends AbstractCommand<Object> {
      * @return General usage options.
      */
     protected Map<String, String> generalUsageOptions() {
-        return F.asMap(SNAPSHOT_NAME_ARG, "Snapshot name.");
+        return F.asMap(
+            SNAPSHOT_NAME_ARG,
+            "Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided."
+        );
     }
 
     /**
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index fe1be09f520..f092fb6c40e 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -3080,7 +3080,7 @@ public class GridCommandHandlerTest extends GridCommandHandlerClusterPerMethodAb
 
         // Invalid command syntax check.
         assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "create", snpName, "blah"));
-        assertContains(log, testOut.toString(), "Invalid argument: blah. Possible options: --sync, --dest.");
+        assertContains(log, testOut.toString(), "Invalid argument: blah. Possible options: --sync, --dest, --incremental.");
 
         assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "create", snpName, "--sync", "blah"));
         assertContains(log, testOut.toString(), "Invalid argument: blah.");
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
index 5945805b076..08e23d9f022 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java
@@ -38,10 +38,18 @@ public interface IgniteSnapshot {
      * Create a consistent copy of all persistence cache groups from the whole cluster.
      *
      * @param name Snapshot unique name which satisfies the following name pattern [a-zA-Z0-9_].
-     * @return Future which will be completed when a process ends.
+     * @return Future which will be completed when the process ends.
      */
     public IgniteFuture<Void> createSnapshot(String name);
 
+    /**
+     * Create an incremental snapshot for a given full snapshot.
+     *
+     * @param fullSnapshot Snapshot unique name which satisfies the following name pattern [a-zA-Z0-9_].
+     * @return Future which will be completed when the process ends.
+     */
+    public IgniteFuture<Void> createIncrementalSnapshot(String fullSnapshot);
+
     /**
      * Cancel running snapshot operation. All intermediate results of cancelled snapshot operation will be deleted.
      * If snapshot already created this command will have no effect.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 91abaa7faa1..e56c3dd0899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
 import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
 import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
@@ -77,7 +78,6 @@ import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer
 import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
-import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
@@ -682,7 +682,7 @@ public class CdcMain implements Runnable {
                 .filter(File::exists)
                 // Cache group directory can contain several cache data files.
                 // See GridLocalConfigManager#cacheConfigurationFile(CacheConfiguration<?, ?>)
-                .flatMap(cacheDir -> Arrays.stream(cacheDir.listFiles(f -> f.getName().endsWith(CACHE_DATA_FILENAME))))
+                .flatMap(cacheDir -> Arrays.stream(FilePageStoreManager.cacheDataFiles(cacheDir)))
                 .map(f -> {
                     try {
                         CdcCacheEvent evt = GridLocalConfigManager.readCacheData(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
index 6cc56335480..787013bbd74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java
@@ -20,10 +20,9 @@ package org.apache.ignite.internal.processors.cache;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -62,6 +61,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerUtils;
+import org.jetbrains.annotations.Nullable;
 
 import static java.nio.file.Files.newDirectoryStream;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
@@ -197,18 +197,26 @@ public class GridLocalConfigManager {
 
     /**
      * @param conf File with stored cache data.
+     * @param marshaller Marshaller.
+     * @param cfg Ignite configuration.
      * @return Cache data.
      * @throws IgniteCheckedException If failed.
      */
     public static StoredCacheData readCacheData(
         File conf,
-        Marshaller marshaller,
-        IgniteConfiguration cfg
+        @Nullable Marshaller marshaller,
+        @Nullable IgniteConfiguration cfg
     ) throws IgniteCheckedException {
-        try (InputStream stream = new BufferedInputStream(new FileInputStream(conf))) {
+        try (InputStream stream = new BufferedInputStream(Files.newInputStream(conf.toPath()))) {
+            if (marshaller == null || cfg == null) {
+                try (ObjectInputStream ostream = new ObjectInputStream(stream)) {
+                    return (StoredCacheData)ostream.readObject();
+                }
+            }
+
             return marshaller.unmarshal(stream, U.resolveClassLoader(cfg));
         }
-        catch (IgniteCheckedException | IOException e) {
+        catch (IgniteCheckedException | IOException | ClassNotFoundException e) {
             throw new IgniteCheckedException("An error occurred during cache configuration loading from file [file=" +
                 conf.getAbsolutePath() + "]", e);
         }
@@ -221,7 +229,7 @@ public class GridLocalConfigManager {
      */
     public void writeCacheData(StoredCacheData cacheData, File conf) throws IgniteCheckedException {
         // Pre-existing file will be truncated upon stream open.
-        try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(conf))) {
+        try (OutputStream stream = new BufferedOutputStream(Files.newOutputStream(conf.toPath()))) {
             marshaller.marshal(cacheData, stream);
         }
         catch (IOException e) {
@@ -355,9 +363,18 @@ public class GridLocalConfigManager {
      * @param lsnr Instance of listener to add.
      */
     public void addConfigurationChangeListener(BiConsumer<String, File> lsnr) {
-        assert chgLock.isWriteLockedByCurrentThread();
+        if (chgLock.isWriteLockedByCurrentThread())
+            lsnrs.add(lsnr);
+        else {
+            chgLock.writeLock().lock();
 
-        lsnrs.add(lsnr);
+            try {
+                lsnrs.add(lsnr);
+            }
+            finally {
+                chgLock.writeLock().unlock();
+            }
+        }
     }
 
     /**
@@ -484,7 +501,7 @@ public class GridLocalConfigManager {
      * @param ccfg Cache configuration.
      * @return Cache configuration file with respect to {@link CacheConfiguration#getGroupName} value.
      */
-    private File cacheConfigurationFile(CacheConfiguration<?, ?> ccfg) {
+    public File cacheConfigurationFile(CacheConfiguration<?, ?> ccfg) {
         File cacheWorkDir = cacheWorkDir(ccfg);
 
         return ccfg.getGroupName() == null ? new File(cacheWorkDir, CACHE_DATA_FILENAME) :
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index cf6f23515ca..c3a6a252cc1 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -943,6 +943,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
             throw new IgniteException("Directory doesn't match the cache or cache group prefix: " + dir);
     }
 
+    /**
+     * @param root Root directory.
+     * @return Array of cache data files.
+     */
+    public static File[] cacheDataFiles(File root) {
+        return root.listFiles(f -> f.getName().endsWith(CACHE_DATA_FILENAME));
+    }
+
     /** {@inheritDoc} */
     @Override public boolean hasIndexStore(int grpId) {
         return !grpsWithoutIdx.contains(grpId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
index cb9486a96d7..7ccc2f3944c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
@@ -121,6 +121,13 @@ abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
         return reqId;
     }
 
+    /**
+     * @return Set of cache groups included into snapshot operation.
+     */
+    public Set<Integer> affectedCacheGroups() {
+        return parts.keySet();
+    }
+
     /**
      * Initiates snapshot task.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e27d6161d53..cd894261b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -66,6 +66,7 @@ import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -106,6 +107,7 @@ import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.managers.systemview.walker.SnapshotViewWalker;
 import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.CacheType;
@@ -308,6 +310,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /** Total snapshot files count which receiver should expect to receive. */
     private static final String SNP_PARTITIONS_CNT = "partsCnt";
 
+    /** Incremental snapshots directory name. */
+    public static final String INC_SNP_DIR = "increments";
+
+    /** Pattern for incremental snapshot directory names. */
+    public static final Pattern INC_SNP_NAME_PATTERN = U.fixedLengthNumberNamePattern(null);
+
     /**
      * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s.
      * It is important to have only one buffer per thread (instead of creating each buffer per
@@ -506,7 +514,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             String.class,
             "The error message of last started cluster snapshot request which fail with an error. " +
                 "This value will be empty if last snapshot request has been completed successfully.");
-        mreg.register("LocalSnapshotNames", this::localSnapshotNames, List.class,
+        mreg.register("LocalSnapshotNames", () -> localSnapshotNames(null), List.class,
             "The list of names of all snapshots currently saved on the local node with respect to " +
                 "the configured via IgniteConfiguration snapshot working path.");
         mreg.register("LastRequestId", () -> Optional.ofNullable(lastSeenSnpFut.rqId).map(UUID::toString).orElse(""),
@@ -572,7 +580,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             SNAPSHOT_SYS_VIEW,
             SNAPSHOT_SYS_VIEW_DESC,
             new SnapshotViewWalker(),
-            () -> F.flatCollections(F.transform(localSnapshotNames(), name -> readSnapshotMetadatas(name, null))),
+            () -> F.flatCollections(F.transform(localSnapshotNames(null), name -> readSnapshotMetadatas(name, null))),
             this::snapshotViewSupplier);
     }
 
@@ -704,6 +712,38 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         return snpPath == null ? new File(locSnpDir, snpName) : new File(snpPath, snpName);
     }
 
+    /**
+     * Returns path to specific incremental snapshot.
+     * For example, {@code "work/snapshots/mybackup/increments/node01/0001"}.
+     *
+     * @param snpName Snapshot name.
+     * @param snpPath Snapshot directory path.
+     * @param incIdx Increment index.
+     * @return Local snapshot directory where snapshot files are located.
+     */
+    public File incrementalSnapshotLocalDir(String snpName, @Nullable String snpPath, int incIdx) {
+        return Paths.get(
+            incrementalSnapshotsLocalRootDir(snpName, snpPath).getAbsolutePath(),
+            U.fixedLengthNumberName(incIdx, null)
+        ).toFile();
+    }
+
+    /**
+     * Returns root folder for incremental snapshot.
+     * For example, {@code "work/snapshots/mybackup/increments/node01"}.
+     *
+     * @param snpName Snapshot name.
+     * @param snpPath Snapshot directory path.
+     * @return Local snapshot directory where snapshot files are located.
+     */
+    public File incrementalSnapshotsLocalRootDir(String snpName, @Nullable String snpPath) {
+        return Paths.get(
+            snapshotLocalDir(snpName, snpPath).getAbsolutePath(),
+            INC_SNP_DIR,
+            pdsSettings.folderName()
+        ).toFile();
+    }
+
     /**
      * @param snpName Snapshot name.
      * @return Local snapshot directory for snapshot with given name.
@@ -773,6 +813,110 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 "on the local node [missed=" + leftGrps + ", nodeId=" + cctx.localNodeId() + ']'));
         }
 
+        if (req.incremental()) {
+            SnapshotMetadata meta = readSnapshotMetadata(new File(
+                snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
+                snapshotMetaFileName(cctx.localNode().consistentId().toString())
+            ));
+
+            try {
+                checkIncrementalCanBeCreated(req.snapshotName(), req.snapshotPath(), meta);
+            }
+            catch (IgniteCheckedException | IOException e) {
+                return new GridFinishedFuture<>(e);
+            }
+
+            return initLocalIncrementalSnapshot(req, meta);
+        }
+        else
+            return initLocalFullSnapshot(req, grpIds, withMetaStorage);
+    }
+
+    /**
+     * @param req Request on snapshot creation.
+     * @param meta Full snapshot metadata.
+     * @return Future which will be completed when a snapshot has been started.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalIncrementalSnapshot(
+        SnapshotOperationRequest req,
+        SnapshotMetadata meta
+    ) {
+        File incSnpDir = incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex());
+
+        IgniteInternalFuture<SnapshotOperationResponse> task0 = registerTask(req.snapshotName(), new IncrementalSnapshotFutureTask(
+            cctx,
+            req.operationalNodeId(),
+            req.requestId(),
+            meta,
+            req.snapshotPath(),
+            req.incrementIndex(),
+            tmpWorkDir,
+            ioFactory
+        )).chain(fut -> {
+            if (fut.error() != null)
+                throw F.wrap(fut.error());
+
+            assert incSnpDir.exists() : "Incremental snapshot directory must exists";
+
+            IncrementalSnapshotMetadata incMeta = new IncrementalSnapshotMetadata(
+                req.requestId(),
+                req.snapshotName(),
+                req.incrementIndex(),
+                cctx.localNode().consistentId().toString(),
+                pdsSettings.folderName(),
+                null /* WAL Pointer for CUT record goes here. */
+            );
+
+            writeSnapshotMetafile(
+                new File(incSnpDir, incrementalSnapshotMetaFileName(req.incrementIndex())),
+                incMeta
+            );
+
+            return new SnapshotOperationResponse();
+        });
+
+        if (task0.isDone())
+            return task0;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Incremental snapshot operation submited for execution" +
+                "[snpName=" + req.snapshotName() + ", incIdx=" + req.incrementIndex());
+        }
+
+        clusterSnpReq = req;
+
+        cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
+            SnapshotOperationRequest snpReq = clusterSnpReq;
+
+            AbstractSnapshotFutureTask<?> task = locSnpTasks.get(snpReq.snapshotName());
+
+            if (task == null)
+                return;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Incremental snapshot operation started" +
+                    "[snpName=" + req.snapshotName() + ", incIdx=" + req.incrementIndex());
+            }
+
+            writeSnapshotDirectoryToMetastorage(incSnpDir);
+
+            task.start();
+        });
+
+        return task0;
+    }
+
+    /**
+     * @param req Request
+     * @param grpIds Groups.
+     * @param withMetaStorage Flag to include metastorage.
+     * @return Create snapshot future.
+     */
+    private IgniteInternalFuture<SnapshotOperationResponse> initLocalFullSnapshot(
+        SnapshotOperationRequest req,
+        List<Integer> grpIds,
+        boolean withMetaStorage
+    ) {
         Map<Integer, Set<Integer>> parts = new HashMap<>();
 
         // Prepare collection of pairs group and appropriate cache partition to be snapshot.
@@ -815,11 +959,6 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
                 File snpDir = snapshotLocalDir(req.snapshotName(), req.snapshotPath());
 
-                File smf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString()));
-
-                if (smf.exists())
-                    throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath()));
-
                 snpDir.mkdirs();
 
                 SnapshotFutureTaskResult res = (SnapshotFutureTaskResult)fut.result();
@@ -836,31 +975,48 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                     cctx.gridConfig().getEncryptionSpi().masterKeyDigest()
                 );
 
-                try (OutputStream out = Files.newOutputStream(smf.toPath())) {
-                    byte[] bytes = U.marshal(marsh, meta);
-                    int blockSize = SNAPSHOT_LIMITED_TRANSFER_BLOCK_SIZE_BYTES;
-
-                    for (int off = 0; off < bytes.length; off += blockSize) {
-                        int len = Math.min(blockSize, bytes.length - off);
-
-                        transferRateLimiter.acquire(len);
-
-                        out.write(bytes, off, len);
-                    }
-                }
-
-                log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
+                writeSnapshotMetafile(
+                    new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString())),
+                    meta
+                );
 
                 SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(), snpDir);
 
                 return new SnapshotOperationResponse(handlers.invokeAll(SnapshotHandlerType.CREATE, ctx));
             }
-            catch (IOException | IgniteCheckedException e) {
+            catch (IgniteCheckedException e) {
                 throw F.wrap(e);
             }
         });
     }
 
+    /**
+     * @param smf File to write to.
+     * @param meta Snapshot meta information.
+     */
+    private <M extends Serializable> void writeSnapshotMetafile(File smf, M meta) {
+        if (smf.exists())
+            throw new GridClosureException(new IgniteException("Snapshot metafile must not exist: " + smf.getAbsolutePath()));
+
+        try (OutputStream out = Files.newOutputStream(smf.toPath())) {
+            byte[] bytes = U.marshal(marsh, meta);
+            int blockSize = SNAPSHOT_LIMITED_TRANSFER_BLOCK_SIZE_BYTES;
+
+            for (int off = 0; off < bytes.length; off += blockSize) {
+                int len = Math.min(blockSize, bytes.length - off);
+
+                transferRateLimiter.acquire(len);
+
+                out.write(bytes, off, len);
+            }
+        }
+        catch (IOException | IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+
+        log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
+    }
+
     /**
      * @param id Request id.
      * @param res Results.
@@ -987,7 +1143,10 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             if (req.error() != null) {
                 snpReq.error(req.error());
 
-                deleteSnapshot(snapshotLocalDir(req.snapshotName(), req.snapshotPath()), pdsSettings.folderName());
+                if (req.incremental())
+                    U.delete(incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex()));
+                else
+                    deleteSnapshot(snapshotLocalDir(req.snapshotName(), req.snapshotPath()), pdsSettings.folderName());
             }
 
             removeLastMetaStorageKey();
@@ -1109,7 +1268,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     /**
      * @return List of all known snapshots on the local node.
      */
-    public List<String> localSnapshotNames() {
+    public List<String> localSnapshotNames(@Nullable String snpPath) {
         if (cctx.kernalContext().clientNode())
             throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
 
@@ -1117,12 +1276,41 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             return Collections.emptyList();
 
         synchronized (snpOpMux) {
-            return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+            File[] dirs = (snpPath == null ? locSnpDir : new File(snpPath)).listFiles(File::isDirectory);
+
+            if (dirs == null)
+                return Collections.emptyList();
+
+            return Arrays.stream(dirs)
                 .map(File::getName)
                 .collect(Collectors.toList());
         }
     }
 
+    /**
+     * @param snpName Full snapshot name.
+     * @param snpPath Snapshot path.
+     * @return Maximum existing incremental snapshot index.
+     */
+    private int maxLocalIncrementSnapshot(String snpName, @Nullable String snpPath) {
+        if (cctx.kernalContext().clientNode())
+            throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation.");
+
+        synchronized (snpOpMux) {
+            File[] incDirs = incrementalSnapshotsLocalRootDir(snpName, snpPath).listFiles(File::isDirectory);
+
+            if (incDirs == null)
+                return 0;
+
+            return Arrays.stream(incDirs)
+                .map(File::getName)
+                .filter(name -> INC_SNP_NAME_PATTERN.matcher(name).matches())
+                .mapToInt(Integer::parseInt)
+                .max()
+                .orElse(0);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> cancelSnapshot(String name) {
         return new IgniteFutureImpl<>(cancelSnapshot0(name).chain(f -> null));
@@ -1505,7 +1693,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> createSnapshot(String name) {
-        return createSnapshot(name, null);
+        return createSnapshot(name, null, false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> createIncrementalSnapshot(String name) {
+        return createSnapshot(name, null, true);
     }
 
     /**
@@ -1515,7 +1708,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
      * @param snpPath Snapshot directory path.
      * @return Future which will be completed when a process ends.
      */
-    public IgniteFutureImpl<Void> createSnapshot(String name, @Nullable String snpPath) {
+    public IgniteFutureImpl<Void> createSnapshot(String name, @Nullable String snpPath, boolean incremental) {
         A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
         A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
 
@@ -1546,7 +1739,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
                 return new IgniteSnapshotFutureImpl(cctx.kernalContext().closure()
                     .callAsyncNoFailover(BALANCE,
-                        new CreateSnapshotCallable(name),
+                        new CreateSnapshotCallable(name, incremental),
                         Collections.singletonList(crd),
                         false,
                         0,
@@ -1554,6 +1747,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             }
 
             ClusterSnapshotFuture snpFut0;
+            int incIdx = -1;
 
             synchronized (snpOpMux) {
                 if (clusterSnpFut != null && !clusterSnpFut.isDone()) {
@@ -1565,10 +1759,20 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 if (clusterSnpReq != null)
                     throw new IgniteException("Create snapshot request has been rejected. Parallel snapshot processes are not allowed.");
 
-                if (localSnapshotNames().contains(name)) {
-                    throw new IgniteException(
-                        "Create snapshot request has been rejected. Snapshot with given name already exists on local node."
-                    );
+                boolean snpExists = localSnapshotNames(snpPath).contains(name);
+
+                if (!incremental && snpExists) {
+                    throw new IgniteException("Create snapshot request has been rejected. " +
+                        "Snapshot with given name already exists on local node.");
+                }
+
+                if (incremental) {
+                    if (!snpExists) {
+                        throw new IgniteException("Create incremental snapshot request has been rejected. " +
+                                "Base snapshot with given name doesn't exist on local node.");
+                    }
+
+                    incIdx = maxLocalIncrementSnapshot(name, snpPath) + 1;
                 }
 
                 if (isRestoring()) {
@@ -1602,10 +1806,21 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             Set<UUID> bltNodeIds =
                 new HashSet<>(F.viewReadOnly(srvNodes, F.node2id(), (node) -> CU.baselineNode(node, clusterState)));
 
-            startSnpProc.start(snpFut0.rqId,
-                new SnapshotOperationRequest(snpFut0.rqId, cctx.localNodeId(), name, snpPath, grps, bltNodeIds));
-
-            String msg = "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + ']';
+            startSnpProc.start(snpFut0.rqId, new SnapshotOperationRequest(
+                snpFut0.rqId,
+                cctx.localNodeId(),
+                name,
+                snpPath,
+                grps,
+                bltNodeIds,
+                incremental,
+                incIdx
+            ));
+
+            String msg =
+                "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps +
+                    (incremental ? "" : (", incremental=true, index=" + incIdx)) +
+                ']';
 
             recordSnapshotEvent(name, msg, EVT_CLUSTER_SNAPSHOT_STARTED);
 
@@ -1668,7 +1883,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         String snpName = (String)metaStorage.read(SNP_RUNNING_KEY);
         String snpDirName = snpName == null ? (String)metaStorage.read(SNP_RUNNING_DIR_KEY) : null;
 
-        File snpDir = snpName != null ? snapshotLocalDir(snpName, null) : snpDirName != null ? new File(snpDirName) : null;
+        File snpDir = snpName != null
+            ? snapshotLocalDir(snpName, null)
+            : snpDirName != null
+                ? new File(snpDirName)
+                : null;
 
         if (snpDir == null)
             return;
@@ -1678,7 +1897,10 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         for (File tmp : snapshotTmpDir().listFiles())
             U.delete(tmp);
 
-        deleteSnapshot(snpDir, pdsSettings.folderName());
+        if (INC_SNP_NAME_PATTERN.matcher(snpDir.getName()).matches() && snpDir.getAbsolutePath().contains(INC_SNP_DIR))
+            U.delete(snpDir);
+        else
+            deleteSnapshot(snpDir, pdsSettings.folderName());
 
         if (log.isInfoEnabled()) {
             log.info("Previous attempt to create snapshot fail due to the local node crash. All resources " +
@@ -1703,6 +1925,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
 
         SnapshotOperationRequest snpReq = clusterSnpReq;
 
+        if (snpReq.incremental())
+            return;
+
         AbstractSnapshotFutureTask<?> task = locSnpTasks.get(snpReq.snapshotName());
 
         if (task == null)
@@ -1774,7 +1999,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         for (AbstractSnapshotFutureTask<?> sctx : F.view(locSnpTasks.values(), t -> t instanceof SnapshotFutureTask)) {
             Set<Integer> retain = new HashSet<>(grps);
 
-            retain.retainAll(((SnapshotFutureTask)sctx).affectedCacheGroups());
+            retain.retainAll(sctx.affectedCacheGroups());
 
             if (!retain.isEmpty()) {
                 sctx.acceptException(new IgniteCheckedException("Snapshot has been interrupted due to some of the required " +
@@ -1791,6 +2016,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT;
     }
 
+    /**
+     * @param incIdx Increment index.
+     * @return Snapshot metadata file name.
+     */
+    public static String incrementalSnapshotMetaFileName(int incIdx) {
+        return U.fixedLengthNumberName(incIdx, SNAPSHOT_METAFILE_EXT);
+    }
+
     /**
      * @param snpDir The full path to the snapshot files.
      * @param folderName The node folder name, usually it's the same as the U.maskForFileName(consistentId).
@@ -2015,6 +2248,24 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             rqId);
     }
 
+    /** @param snpLocDir Snapshot local directory. */
+    public void writeSnapshotDirectoryToMetastorage(File snpLocDir) {
+        cctx.database().checkpointReadLock();
+
+        try {
+            assert metaStorage != null && metaStorage.read(SNP_RUNNING_DIR_KEY) == null :
+                "The previous snapshot hasn't been completed correctly";
+
+            metaStorage.write(SNP_RUNNING_DIR_KEY, snpLocDir.getAbsolutePath());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            cctx.database().checkpointReadUnlock();
+        }
+    }
+
     /** Snapshot finished successfully or already restored. Key can be removed. */
     private void removeLastMetaStorageKey() throws IgniteCheckedException {
         cctx.database().checkpointReadLock();
@@ -2178,6 +2429,102 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         return new IgniteFutureImpl<>(cctx.kernalContext().task().execute(taskCls, snpName));
     }
 
+    /**
+     * Checks that incremental snapshot can be created for given full snapshot and current cluster state.
+     *
+     * @param name Full snapshot name.
+     * @param snpPath Snapshot path.
+     * @param meta Full snapshot metadata.
+     */
+    private void checkIncrementalCanBeCreated(
+        String name,
+        @Nullable String snpPath,
+        SnapshotMetadata meta
+    ) throws IgniteCheckedException, IOException {
+        File snpDir = snapshotLocalDir(name, snpPath);
+
+        Set<String> aliveNodesConsIds = cctx.discovery().aliveServerNodes()
+            .stream()
+            .map(node -> node.consistentId().toString())
+            .collect(Collectors.toSet());
+
+        for (String consId : meta.baselineNodes()) {
+            if (!aliveNodesConsIds.contains(consId)) {
+                throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+                    "Node from full snapshot offline [consistentId=" + consId + ']');
+            }
+        }
+
+        File rootSnpCachesDir = new File(snpDir, databaseRelativePath(meta.folderName()));
+
+        for (int grpId : meta.cacheGroupIds()) {
+            if (grpId == METASTORAGE_CACHE_ID)
+                continue;
+
+            CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grpId);
+
+            if (gctx == null) {
+                throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+                    "Cache group destroyed [groupId=" + grpId + ']');
+            }
+
+            if (gctx.config().isEncryptionEnabled()) {
+                throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+                    "Encrypted cache groups not supported [groupId=" + grpId + ']');
+            }
+
+            List<File> snpCacheDir =
+                cacheDirectories(rootSnpCachesDir, grpName -> gctx.cacheOrGroupName().equals(grpName));
+
+            if (snpCacheDir.isEmpty()) {
+                throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+                    "Cache group directory not found [groupId=" + grpId + ']');
+            }
+
+            assert snpCacheDir.size() == 1 : "Single snapshot cache directory must be found";
+
+            for (File snpDataFile : FilePageStoreManager.cacheDataFiles(snpCacheDir.get(0))) {
+                StoredCacheData snpCacheData = GridLocalConfigManager.readCacheData(
+                    snpDataFile,
+                    MarshallerUtils.jdkMarshaller(cctx.kernalContext().igniteInstanceName()),
+                    cctx.kernalContext().config()
+                );
+
+                byte[] snpCacheDataBytes = Files.readAllBytes(snpDataFile.toPath());
+
+                File nodeDataFile = new File(snpDataFile.getAbsolutePath().replace(
+                    rootSnpCachesDir.getAbsolutePath(),
+                    pdsSettings.persistentStoreNodePath().getAbsolutePath()
+                ));
+
+                if (!nodeDataFile.exists()) {
+                    throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+                        "Cache destroyed [cacheId=" + snpCacheData.cacheId() +
+                        ", cacheName=" + snpCacheData.config().getName() + ']');
+                }
+
+                byte[] nodeCacheDataBytes = Files.readAllBytes(nodeDataFile.toPath());
+
+                if (!Arrays.equals(snpCacheDataBytes, nodeCacheDataBytes)) {
+                    throw new IgniteCheckedException(
+                        cacheChangedException(snpCacheData.cacheId(), snpCacheData.config().getName())
+                    );
+                }
+            }
+        }
+    }
+
+    /**
+     * Throw cache changed exception.
+     *
+     * @param cacheId Cache id.
+     * @param name Cache name.
+     */
+    public static String cacheChangedException(int cacheId, String name) {
+        return "Create incremental snapshot request has been rejected. " +
+            "Cache changed [cacheId=" + cacheId + ", cacheName=" + name + ']';
+    }
+
     /**
      * @param meta Snapshot metadata.
      * @return Snapshot view.
@@ -3138,22 +3485,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                     "[snpName=" + snpLocDir.getName() + ", absPath=" + dbDir.getAbsolutePath() + ']');
             }
 
-            cctx.database().checkpointReadLock();
+            writeSnapshotDirectoryToMetastorage(snpLocDir);
 
             try {
-                assert metaStorage != null && metaStorage.read(SNP_RUNNING_DIR_KEY) == null :
-                    "The previous snapshot hasn't been completed correctly";
-
-                metaStorage.write(SNP_RUNNING_DIR_KEY, snpLocDir.getAbsolutePath());
-
                 U.ensureDirectory(dbDir, "snapshot work directory for a local snapshot sender", log);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
             }
-            finally {
-                cctx.database().checkpointReadUnlock();
-            }
         }
 
         /** {@inheritDoc} */
@@ -3434,6 +3773,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         /** Snapshot name. */
         private final String snpName;
 
+        /** Incremental flag. */
+        private final boolean incremental;
+
         /** Auto-injected grid instance. */
         @IgniteInstanceResource
         private transient IgniteEx ignite;
@@ -3441,13 +3783,17 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         /**
          * @param snpName Snapshot name.
          */
-        public CreateSnapshotCallable(String snpName) {
+        public CreateSnapshotCallable(String snpName, boolean incremental) {
             this.snpName = snpName;
+            this.incremental = incremental;
         }
 
         /** {@inheritDoc} */
         @Override public Void call() throws Exception {
-            ignite.snapshot().createSnapshot(snpName).get();
+            if (incremental)
+                ignite.snapshot().createIncrementalSnapshot(snpName).get();
+            else
+                ignite.snapshot().createSnapshot(snpName).get();
 
             return null;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
new file mode 100644
index 00000000000..aa9e2208c05
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class IncrementalSnapshotFutureTask
+    extends AbstractSnapshotFutureTask<IncrementalSnapshotFutureTaskResult>
+    implements BiConsumer<String, File> {
+    /** Index of incremental snapshot. */
+    private final int incIdx;
+
+    /** Snapshot path. */
+    private final @Nullable String snpPath;
+
+    /** Metadata of the full snapshot. */
+    private final Set<Integer> affectedCacheGrps;
+
+    /** */
+    public IncrementalSnapshotFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        UUID reqNodeId,
+        SnapshotMetadata meta,
+        String snpPath,
+        int incIdx,
+        File tmpWorkDir,
+        FileIOFactory ioFactory
+    ) {
+        super(
+            cctx,
+            srcNodeId,
+            reqNodeId,
+            meta.snapshotName(),
+            tmpWorkDir,
+            ioFactory,
+            new SnapshotSender(
+                cctx.logger(IncrementalSnapshotFutureTask.class),
+                cctx.kernalContext().pools().getSnapshotExecutorService()
+            ) {
+                @Override protected void init(int partsCnt) {
+                    // No-op.
+                }
+
+                @Override protected void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) {
+                    // No-op.
+                }
+
+                @Override protected void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) {
+                    // No-op.
+                }
+            },
+            null
+        );
+
+        this.incIdx = incIdx;
+        this.snpPath = snpPath;
+        this.affectedCacheGrps = new HashSet<>(meta.cacheGroupIds());
+
+        cctx.cache().configManager().addConfigurationChangeListener(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> affectedCacheGroups() {
+        return affectedCacheGrps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean start() {
+        try {
+            File incSnpDir = cctx.snapshotMgr().incrementalSnapshotLocalDir(snpName, snpPath, incIdx);
+
+            if (!incSnpDir.mkdirs()) {
+                onDone(new IgniteException("Can't create snapshot directory[dir=" + incSnpDir.getAbsolutePath() + ']'));
+
+                return false;
+            }
+
+            onDone(new IncrementalSnapshotFutureTaskResult());
+
+            return true;
+        }
+        finally {
+            cctx.cache().configManager().removeConfigurationChangeListener(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acceptException(Throwable th) {
+        cctx.cache().configManager().removeConfigurationChangeListener(this);
+
+        onDone(th);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(String name, File file) {
+        onDone(new IgniteException(IgniteSnapshotManager.cacheChangedException(CU.cacheId(name), name)));
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTaskResult.java
new file mode 100644
index 00000000000..2ff37e6d3f9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTaskResult.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+/** */
+class IncrementalSnapshotFutureTaskResult {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
new file mode 100644
index 00000000000..c262b070d82
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Incremental snapshot metadata file.
+ *
+ * @see IgniteSnapshotManager#createIncrementalSnapshot(String)
+ */
+public class IncrementalSnapshotMetadata implements Serializable {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** Unique snapshot request id. */
+    private final UUID rqId;
+
+    /** Snapshot name. */
+    private final String snpName;
+    
+    /** Increment index. */
+    private final int incIdx;
+
+    /** Consistent id of a node to which this metadata relates. */
+    private final String consId;
+
+    /**
+     * Directory related to the current consistent node id on which partition files are stored.
+     * For some of the cases, consId doesn't equal the directory name.
+     */
+    private final String folderName;
+
+    /** WAL pointer to consistent cut record. */
+    private final WALPointer cutPtr;
+
+    /**
+     * @param rqId Unique request id.
+     * @param snpName Snapshot name.
+     * @param consId Consistent id of a node to which this metadata relates.
+     * @param folderName Directory name which stores the data files.
+     * @param cutPtr Pointer to consistent cut record.
+     */
+    public IncrementalSnapshotMetadata(
+        UUID rqId,
+        String snpName,
+        int incIdx,
+        String consId,
+        String folderName,
+        WALPointer cutPtr
+    ) {
+        this.rqId = rqId;
+        this.snpName = snpName;
+        this.incIdx = incIdx;
+        this.consId = consId;
+        this.folderName = folderName;
+        this.cutPtr = cutPtr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IncrementalSnapshotMetadata.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index aa5326aa9af..4bacbb349f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -189,13 +189,6 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<SnapshotFutureTaskRe
         this.locBuff = locBuff;
     }
 
-    /**
-     * @return Set of cache groups included into snapshot operation.
-     */
-    public Set<Integer> affectedCacheGroups() {
-        return parts.keySet();
-    }
-
     /**
      * @param th An exception which occurred during snapshot processing.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
index 713ab2e46c0..7bea6bcb971 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMXBeanImpl.java
@@ -50,7 +50,15 @@ public class SnapshotMXBeanImpl implements SnapshotMXBean {
 
     /** {@inheritDoc} */
     @Override public void createSnapshot(String snpName, String snpPath) {
-        IgniteFuture<Void> fut = mgr.createSnapshot(snpName, F.isEmpty(snpPath) ? null : snpPath);
+        IgniteFuture<Void> fut = mgr.createSnapshot(snpName, F.isEmpty(snpPath) ? null : snpPath, false);
+
+        if (fut.isDone())
+            fut.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void createIncrementalSnapshot(String fullSnapshot, String fullSnapshotPath) {
+        IgniteFuture<Void> fut = mgr.createSnapshot(fullSnapshot, F.isEmpty(fullSnapshotPath) ? null : fullSnapshotPath, true);
 
         if (fut.isDone())
             fut.get();
@@ -88,6 +96,8 @@ public class SnapshotMXBeanImpl implements SnapshotMXBean {
 
         if (req != null) {
             return "Create snapshot operation is in progress [name=" + req.snapshotName() +
+                ", incremental=" + req.incremental() +
+                (req.incremental() ? (", incrementalIndex=" + req.incrementIndex()) : "") +
                 ", id=" + req.requestId() + ']';
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
index e1f8e4e2e6c..10ab16aa82b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java
@@ -85,7 +85,8 @@ public class SnapshotMetadata implements Serializable {
     @Nullable private final byte[] masterKeyDigest;
 
     /**
-     * F@param snpName Snapshot name.
+     * @param rqId Unique request id.
+     * @param snpName Snapshot name.
      * @param consId Consistent id of a node to which this metadata relates.
      * @param folderName Directory name which stores the data files.
      * @param pageSize Page size of stored snapshot data.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index 14c72f661ea..23a55a6251c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -64,6 +64,12 @@ public class SnapshotOperationRequest implements Serializable {
     /** Operation start time. */
     private final long startTime;
 
+    /** If {@code true} then incremental snapshot requested. */
+    private final boolean incremental;
+
+    /** Index of incremental snapshot. */
+    private final int incIdx;
+
     /**
      * @param reqId Request ID.
      * @param opNodeId Operational node ID.
@@ -71,6 +77,8 @@ public class SnapshotOperationRequest implements Serializable {
      * @param snpPath Snapshot directory path.
      * @param grps List of cache group names.
      * @param nodes Baseline node IDs that must be alive to complete the operation.
+     * @param incremental {@code True} if incremental snapshot requested.
+     * @param incIdx Incremental snapshot index.
      */
     public SnapshotOperationRequest(
         UUID reqId,
@@ -78,7 +86,9 @@ public class SnapshotOperationRequest implements Serializable {
         String snpName,
         String snpPath,
         @Nullable Collection<String> grps,
-        Set<UUID> nodes
+        Set<UUID> nodes,
+        boolean incremental,
+        int incIdx
     ) {
         this.reqId = reqId;
         this.opNodeId = opNodeId;
@@ -86,6 +96,8 @@ public class SnapshotOperationRequest implements Serializable {
         this.grps = grps;
         this.nodes = nodes;
         this.snpPath = snpPath;
+        this.incremental = incremental;
+        this.incIdx = incIdx;
         startTime = U.currentTimeMillis();
     }
 
@@ -145,6 +157,16 @@ public class SnapshotOperationRequest implements Serializable {
         this.err = err;
     }
 
+    /** @return {@code True} if incremental snapshot requested. */
+    public boolean incremental() {
+        return incremental;
+    }
+
+    /** @return Incremental index. */
+    public int incrementIndex() {
+        return incIdx;
+    }
+
     /** @return Start time. */
     public long startTime() {
         return startTime;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index d41fdda28ca..08cac31a386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -332,7 +332,15 @@ public class SnapshotRestoreProcess {
             Collection<UUID> bltNodes = F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());
 
             SnapshotOperationRequest req = new SnapshotOperationRequest(
-                fut0.rqId, F.first(dataNodes), snpName, snpPath, cacheGrpNames, new HashSet<>(bltNodes));
+                fut0.rqId,
+                F.first(dataNodes),
+                snpName,
+                snpPath,
+                cacheGrpNames,
+                new HashSet<>(bltNodes),
+                false,
+                -1
+            );
 
             prepareRestoreProc.start(req.requestId(), req);
         });
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
index eeaab94b87d..2f34b575bbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
 import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
-import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.file.StandardOpenOption.READ;
@@ -36,9 +36,6 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe
     /** file extension of WAL segment. */
     private static final String WAL_SEGMENT_FILE_EXT = ".wal";
 
-    /** Length of WAL segment file name. */
-    private static final int WAL_SEGMENT_FILE_NAME_LENGTH = 16;
-
     /** File represented by this class. */
     protected final File file;
 
@@ -67,7 +64,7 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe
 
         assert fileName.contains(WAL_SEGMENT_FILE_EXT);
 
-        this.idx = idx == null ? Long.parseLong(fileName.substring(0, WAL_SEGMENT_FILE_NAME_LENGTH)) : idx;
+        this.idx = idx == null ? U.fixedLengthFileNumber(fileName) : idx;
     }
 
     /**
@@ -77,16 +74,7 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe
      * @return Segment file name.
      */
     public static String fileName(long idx) {
-        SB b = new SB();
-
-        String segmentStr = Long.toString(idx);
-
-        for (int i = segmentStr.length(); i < WAL_SEGMENT_FILE_NAME_LENGTH; i++)
-            b.a('0');
-
-        b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT);
-
-        return b.toString();
+        return U.fixedLengthNumberName(idx, WAL_SEGMENT_FILE_EXT);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 868871288e6..51a1b88b105 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -173,10 +173,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private static final byte[] FILL_BUF = new byte[1024 * 1024];
 
     /** Pattern for segment file names. */
-    public static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
+    public static final Pattern WAL_NAME_PATTERN = U.fixedLengthNumberNamePattern(".wal");
 
     /** Pattern for WAL temp files - these files will be cleared at startup. */
-    public static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
+    public static final Pattern WAL_TEMP_NAME_PATTERN = U.fixedLengthNumberNamePattern(".wal.tmp");
 
     /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
     public static final FileFilter WAL_SEGMENT_FILE_FILTER = file -> !file.isDirectory() &&
@@ -187,7 +187,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();
 
     /** */
-    public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");
+    public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(".wal.zip");
 
     /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
     public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = file -> !file.isDirectory() &&
@@ -195,7 +195,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
 
     /** */
-    private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
+    private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = U.fixedLengthNumberNamePattern(".wal.zip.tmp");
 
     /** */
     private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = file -> !file.isDirectory() &&
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ec8764393c0..d2101fb2545 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -401,6 +401,9 @@ public abstract class IgniteUtils {
     /** Alphanumeric with underscore regexp pattern. */
     private static final Pattern ALPHANUMERIC_UNDERSCORE_PATTERN = Pattern.compile("^[a-zA-Z_0-9]+$");
 
+    /** Length of numbered file name. */
+    public static final int NUMBER_FILE_NAME_LENGTH = 16;
+
     /** Project home directory. */
     private static volatile GridTuple<String> ggHome;
 
@@ -3364,6 +3367,50 @@ public abstract class IgniteUtils {
         return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
     }
 
+    /**
+     * Generates file name from index.
+     *
+     * @param num Number to generate file name.
+     * @param ext Optional extension
+     * @return File name.
+     */
+    public static String fixedLengthNumberName(long num, @Nullable String ext) {
+        SB b = new SB();
+
+        String segmentStr = Long.toString(num);
+
+        for (int i = segmentStr.length(); i < NUMBER_FILE_NAME_LENGTH; i++)
+            b.a('0');
+
+        b.a(segmentStr);
+
+        if (ext != null)
+            b.a(ext);
+
+        return b.toString();
+    }
+
+    /**
+     * @param fileName File name.
+     * @return Number of this file.
+     */
+    public static long fixedLengthFileNumber(String fileName) {
+        return Long.parseLong(fileName.substring(0, NUMBER_FILE_NAME_LENGTH));
+    }
+
+    /**
+     * @param ext Optional extension.
+     * @return Pattern to match numbered file name with the specific extension.
+     */
+    public static Pattern fixedLengthNumberNamePattern(@Nullable String ext) {
+        String pattern = "\\d{" + NUMBER_FILE_NAME_LENGTH + "}";
+
+        if (ext != null)
+            pattern += ext.replaceAll("\\.", "\\\\\\.");
+
+        return Pattern.compile(pattern);
+    }
+
     /**
      * Verifier always returns successful result for any host.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 0b4d4863597..8fb3c4d8a78 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -3012,7 +3012,7 @@ public class GridFunc {
             V v2 = m2.get(e.getKey());
 
             if (v1 == v2)
-                return true;
+                continue;
 
             if (v1 == null || v2 == null)
                 return false;
@@ -3027,7 +3027,7 @@ public class GridFunc {
                         return false;
                 }
                 else {
-                    if (!eq(v1, v2))
+                    if (!(v1.getClass().isArray() ? arrayEq(v1, v2) : eq(v1, v2)))
                         return false;
                 }
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java
index 2214a50affb..df34e2487dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTask.java
@@ -52,8 +52,11 @@ public class VisorSnapshotCreateTask extends VisorSnapshotOneNodeTask<VisorSnaps
 
         /** {@inheritDoc} */
         @Override protected String run(VisorSnapshotCreateTaskArg arg) throws IgniteException {
-            IgniteFutureImpl<Void> fut =
-                ignite.context().cache().context().snapshotMgr().createSnapshot(arg.snapshotName(), arg.snapshotPath());
+            IgniteFutureImpl<Void> fut = ignite.context().cache().context().snapshotMgr().createSnapshot(
+                arg.snapshotName(),
+                arg.snapshotPath(),
+                arg.incremental()
+            );
 
             IgniteSnapshotManager.ClusterSnapshotFuture snpFut =
                 fut.internalFuture() instanceof IgniteSnapshotManager.ClusterSnapshotFuture ?
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java
index a828cf9ec9d..eadfab88551 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotCreateTaskArg.java
@@ -32,6 +32,9 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
     /** Synchronous execution flag. */
     private boolean sync;
 
+    /** Incremental snapshot flag. */
+    private boolean inc;
+
     /** Default constructor. */
     public VisorSnapshotCreateTaskArg() {
         // No-op.
@@ -41,11 +44,13 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
      * @param snpName Snapshot name.
      * @param snpPath Snapshot directory path.
      * @param sync Synchronous execution flag.
+     * @param inc Incremental snapshot flag.
      */
-    public VisorSnapshotCreateTaskArg(String snpName, String snpPath, boolean sync) {
+    public VisorSnapshotCreateTaskArg(String snpName, String snpPath, boolean sync, boolean inc) {
         super(snpName, snpPath);
 
         this.sync = sync;
+        this.inc = inc;
     }
 
     /** @return Synchronous execution flag. */
@@ -53,11 +58,17 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
         return sync;
     }
 
+    /** @return Incremental snapshot flag. */
+    public boolean incremental() {
+        return inc;
+    }
+
     /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws IOException {
         super.writeExternalData(out);
 
         out.writeBoolean(sync);
+        out.writeBoolean(inc);
     }
 
     /** {@inheritDoc} */
@@ -65,6 +76,7 @@ public class VisorSnapshotCreateTaskArg extends VisorSnapshotCheckTaskArg {
         super.readExternalData(ver, in);
 
         sync = in.readBoolean();
+        inc = in.readBoolean();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java
index 86c07c4740a..bb5778df955 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/snapshot/VisorSnapshotRestoreTaskArg.java
@@ -56,7 +56,7 @@ public class VisorSnapshotRestoreTaskArg extends VisorSnapshotCreateTaskArg {
         VisorSnapshotRestoreTaskAction action,
         @Nullable Collection<String> grpNames
     ) {
-        super(snpName, snpPath, sync);
+        super(snpName, snpPath, sync, false);
 
         this.action = action;
         this.grpNames = grpNames;
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
index d8b06eaccd9..479268f0264 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/SnapshotMXBean.java
@@ -40,6 +40,21 @@ public interface SnapshotMXBean {
             String snpPath
     );
 
+    /**
+     * Create the cluster-wide incremental snapshot for the given base snapshot.
+     *
+     * @param fullSnapshot Full snapshot name to attach incremental snapshot to.
+     * @param fullSnapshotPath Full snapshot directory path.
+     * @see IgniteSnapshot#createSnapshot(String)
+     * @see IgniteSnapshot#createIncrementalSnapshot(String)
+     */
+    public void createIncrementalSnapshot(
+        @MXBeanParameter(name = "fullSnapshot", description = "Snapshot name.")
+            String fullSnapshot,
+        @MXBeanParameter(name = "fullSnapshotPath", description = "Optional snapshot directory path.")
+            String fullSnapshotPath
+    );
+
     /**
      * Cancel previously started snapshot operation on the node initiator.
      *
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
index 9aa661a6acb..3623553ee4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java
@@ -103,6 +103,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.file.FileP
 import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_SNAPSHOT_TMP_DIR;
+import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.incrementalSnapshotMetaFileName;
 import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.resolveSnapshotWorkDirectory;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -646,6 +647,15 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest {
         return snpFutTask;
     }
 
+    /** Checks incremental snapshot exists. */
+    protected boolean checkIncremental(IgniteEx node, String snpName, String snpPath, int incIdx) {
+        File incSnpDir = snp(node).incrementalSnapshotLocalDir(snpName, snpPath, incIdx);
+
+        return incSnpDir.exists()
+            && incSnpDir.isDirectory()
+            && new File(incSnpDir, incrementalSnapshotMetaFileName(incIdx)).exists();
+    }
+
     /**
      * @param ignite Ignite instance to resolve discovery spi to.
      * @return BlockingCustomMessageDiscoverySpi instance.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
index a3b4600b5b0..13edc33a16b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotHandlerTest.java
@@ -394,7 +394,7 @@ public class IgniteClusterSnapshotHandlerTest extends IgniteClusterSnapshotResto
 
             IgniteSnapshotManager snpMgr = ignite.context().cache().context().snapshotMgr();
 
-            snpMgr.createSnapshot(snpName, snpDir.getAbsolutePath()).get(TIMEOUT);
+            snpMgr.createSnapshot(snpName, snpDir.getAbsolutePath(), false).get(TIMEOUT);
 
             ignite.destroyCache(DEFAULT_CACHE_NAME);
             awaitPartitionMapExchange();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
index 0ec6648f45d..030c006c818 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreSelfTest.java
@@ -138,7 +138,9 @@ public class IgniteClusterSnapshotRestoreSelfTest extends IgniteClusterSnapshotR
             for (int i = 0; i < CACHE_KEYS_RANGE; i++)
                 ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
 
-            ignite.context().cache().context().snapshotMgr().createSnapshot(SNAPSHOT_NAME, snpDir.toString()).get(TIMEOUT);
+            ignite.context().cache().context().snapshotMgr()
+                .createSnapshot(SNAPSHOT_NAME, snpDir.toString(), false)
+                .get(TIMEOUT);
 
             // Check snapshot.
             IdleVerifyResultV2 res = snp(ignite).checkSnapshot(SNAPSHOT_NAME, snpDir.getAbsolutePath()).get(TIMEOUT);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index cffa3156c4d..f9317a47808 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -101,6 +101,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.snapshot.I
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeFalse;
 
 /**
  * Cluster-wide snapshot test.
@@ -466,7 +467,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
 
         IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
 
-        spi.waitBlocked(10_000L);
+        spi.waitBlocked(TIMEOUT);
 
         // Creating of new caches should not be blocked.
         ignite.getOrCreateCache(dfltCacheCfg.setName("default2"))
@@ -502,7 +503,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
 
         IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
 
-        spi.waitBlocked(10_000L);
+        spi.waitBlocked(TIMEOUT);
 
         // Not baseline node joins successfully.
         String grid4Dir = folderName(startGrid(4));
@@ -527,21 +528,34 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
     /** @throws Exception If fails. */
     @Test
     public void testClusterSnapshotExOnInitiatorLeft() throws Exception {
-        IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
+        for (boolean inc : new boolean[] {false, true}) {
+            IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE);
 
-        BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
-        spi.block((msg) -> msg instanceof FullMessage);
+            if (inc) {
+                assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
 
-        IgniteFuture<Void> fut = ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
+                ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+            }
 
-        spi.waitBlocked(10_000L);
+            BlockingCustomMessageDiscoverySpi spi = discoSpi(ignite);
+            spi.block((msg) -> msg instanceof FullMessage);
 
-        ignite.close();
+            IgniteFuture<Void> fut = inc
+                ? ignite.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME)
+                : ignite.snapshot().createSnapshot(SNAPSHOT_NAME);
 
-        assertThrowsAnyCause(log,
-            fut::get,
-            NodeStoppingException.class,
-            SNP_NODE_STOPPING_ERR_MSG);
+            spi.waitBlocked(TIMEOUT);
+
+            ignite.close();
+
+            assertThrowsAnyCause(log,
+                fut::get,
+                NodeStoppingException.class,
+                SNP_NODE_STOPPING_ERR_MSG);
+
+            stopAllGrids();
+            cleanPersistenceDir();
+        }
     }
 
     /** @throws Exception If fails. */
@@ -611,7 +625,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
         assertTrue("Snapshot directory must be empty for node 1 due to snapshot future fail: " + dirNameIgnite1,
             !searchDirectoryRecursively(locSnpDir.toPath(), dirNameIgnite1).isPresent());
 
-        List<String> allSnapshots = snp(ignite).localSnapshotNames();
+        List<String> allSnapshots = snp(ignite).localSnapshotNames(null);
 
         assertTrue("Snapshot directory must be empty due to snapshot fail: " + allSnapshots,
             allSnapshots.isEmpty());
@@ -668,7 +682,10 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
 
         awaitPartitionMapExchange();
 
-        assertTrue("Snapshot directory must be empty", grid2.context().cache().context().snapshotMgr().localSnapshotNames().isEmpty());
+        assertTrue(
+            "Snapshot directory must be empty",
+            grid2.context().cache().context().snapshotMgr().localSnapshotNames(null).isEmpty()
+        );
 
         ignite.snapshot().createSnapshot(SNAPSHOT_NAME)
             .get();
@@ -747,7 +764,7 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
                 ignite.cache(DEFAULT_CACHE_NAME).put(i, i);
 
             ignite.context().cache().context().snapshotMgr()
-                .createSnapshot(SNAPSHOT_NAME, cfgPath ? null : snpDir.getAbsolutePath()).get();
+                .createSnapshot(SNAPSHOT_NAME, cfgPath ? null : snpDir.getAbsolutePath(), false).get();
 
             stopAllGrids();
 
@@ -773,7 +790,9 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
             GridKernalContext kctx = grid(idx).context();
 
             assertThrowsAnyCause(log,
-                () -> kctx.cache().context().snapshotMgr().createSnapshot(SNAPSHOT_NAME, invalidPath).get(TIMEOUT),
+                () -> kctx.cache().context().snapshotMgr()
+                    .createSnapshot(SNAPSHOT_NAME, invalidPath, false)
+                    .get(TIMEOUT),
                 IgniteCheckedException.class,
                 invalidPath);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
index 837038b31c8..867c006be4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotMXBeanTest.java
@@ -75,6 +75,15 @@ public class IgniteSnapshotMXBeanTest extends AbstractSnapshotSelfTest {
         assertTrue("Waiting for snapshot operation failed.",
             GridTestUtils.waitForCondition(() -> (long)getMetric("LastSnapshotEndTime", snpMBean) > 0, TIMEOUT));
 
+        if (!encryption) {
+            mxBean.createIncrementalSnapshot(SNAPSHOT_NAME, "");
+
+            assertTrue(
+                "Waiting for incremental snapshot failed",
+                GridTestUtils.waitForCondition(() -> checkIncremental(ignite, SNAPSHOT_NAME, null, 1), TIMEOUT)
+            );
+        }
+
         stopAllGrids();
 
         IgniteEx snp = startGridsFromSnapshot(2, SNAPSHOT_NAME);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
new file mode 100644
index 00000000000..340955a3ae6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.function.UnaryOperator;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.junit.Assume.assumeFalse;
+
+/**
+ * Basic tests for incremental snapshots.
+ */
+public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
+    /** */
+    public static final int GRID_CNT = 3;
+
+    /** */
+    public static final String OTHER_CACHE = "other-cache";
+
+    /** */
+    public static final String GROUPED_CACHE = "my-grouped-cache2";
+
+    /** */
+    @Test
+    public void testCreation() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        IgniteEx srv = startGridsWithCache(
+            GRID_CNT,
+            CACHE_KEYS_RANGE,
+            key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+        );
+
+        File snpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
+
+        assertTrue("Target directory is not empty: " + snpDir, F.isEmpty(snpDir.list()));
+
+        IgniteEx cli = startClientGrid(
+            GRID_CNT,
+            (UnaryOperator<IgniteConfiguration>)
+                cfg -> cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
+        );
+
+        File exSnpDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ex_snapshots", true);
+
+        for (boolean client : new boolean[] {false, true}) {
+            IgniteSnapshotManager snpCreate = snp(client ? cli : srv);
+
+            for (File snpPath : new File[] {null, exSnpDir}) {
+                if (client && snpPath != null) // Set snapshot path not supported for snapshot from client nodes.
+                    continue;
+
+                String snpName = SNAPSHOT_NAME + "_" + client + "_" + (snpPath == null ? "" : snpPath.getName());
+
+                if (snpPath == null)
+                    snpCreate.createSnapshot(snpName).get(TIMEOUT);
+                else
+                    snpCreate.createSnapshot(snpName, snpPath.getAbsolutePath(), false).get(TIMEOUT);
+
+                for (int incIdx = 1; incIdx < 3; incIdx++) {
+                    if (snpPath == null)
+                        snpCreate.createIncrementalSnapshot(snpName).get(TIMEOUT);
+                    else
+                        snpCreate.createSnapshot(snpName, snpPath.getAbsolutePath(), true).get(TIMEOUT);
+
+                    for (int gridIdx = 0; gridIdx < GRID_CNT; gridIdx++) {
+                        assertTrue("Incremental snapshot must exists on node " + gridIdx, checkIncremental(
+                            grid(gridIdx),
+                            snpName,
+                            snpPath == null ? null : snpPath.getAbsolutePath(),
+                            incIdx
+                        ));
+                    }
+                }
+            }
+        }
+    }
+
+    /** */
+    @Test
+    public void testFailForUnknownBaseSnapshot() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        IgniteEx ign = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        assertThrowsWithCause(
+            () -> snp(ign).createIncrementalSnapshot("unknown").get(TIMEOUT),
+            IgniteException.class
+        );
+
+        snp(ign).createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        assertThrowsWithCause(
+            () -> snp(ign).createIncrementalSnapshot("unknown").get(TIMEOUT),
+            IgniteException.class
+        );
+    }
+
+    /** */
+    @Test
+    public void testIncrementalSnapshotFailsOnTopologyChange() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        IgniteEx srv = startGridsWithCache(
+            GRID_CNT,
+            CACHE_KEYS_RANGE,
+            key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+        );
+
+        IgniteSnapshotManager snpCreate = snp(srv);
+
+        snpCreate.createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        String consId = grid(1).context().discovery().localNode().consistentId().toString();
+
+        // Stop some node.
+        stopGrid(1);
+
+        assertThrows(
+            null,
+            () -> snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+            IgniteException.class,
+            "Create incremental snapshot request has been rejected. " +
+                "Node from full snapshot offline [consistentId=" + consId + ']'
+        );
+    }
+
+    /** */
+    @Test
+    public void testIncrementalSnapshotFailsOnCacheDestroy() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        checkFailWhenCacheDestroyed(OTHER_CACHE, "Create incremental snapshot request has been rejected. " +
+            "Cache group destroyed [groupId=" + CU.cacheId(OTHER_CACHE) + ']');
+    }
+
+    /** */
+    @Test
+    public void testIncrementalSnapshotFailsOnGroupedCacheDestroy() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        checkFailWhenCacheDestroyed(GROUPED_CACHE, "Create incremental snapshot request has been rejected. " +
+            "Cache destroyed [cacheId=" + CU.cacheId(GROUPED_CACHE) + ", cacheName=" + GROUPED_CACHE + ']');
+    }
+
+    /** */
+    @Test
+    public void testIncrementalSnapshotFailsOnCacheChange() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        CacheConfiguration<Integer, Account> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        IgniteEx srv = startGridsWithCache(1, CACHE_KEYS_RANGE, key -> new Account(key, key), ccfg);
+
+        snp(srv).createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        GridLocalConfigManager locCfgMgr = srv.context().cache().configManager();
+
+        File ccfgFile = locCfgMgr.cacheConfigurationFile(ccfg);
+
+        StoredCacheData cacheData = locCfgMgr.readCacheData(ccfgFile);
+
+        assertNotNull(cacheData);
+
+        cacheData.queryEntities(Collections.singletonList(new QueryEntity(String.class, Account.class)));
+
+        locCfgMgr.writeCacheData(cacheData, ccfgFile);
+
+        assertThrows(
+            null,
+            () -> snp(srv).createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+            IgniteException.class,
+            "Cache changed [cacheId=" + CU.cacheId(DEFAULT_CACHE_NAME) + ", cacheName=" + DEFAULT_CACHE_NAME + ']'
+        );
+    }
+
+    /** */
+    @Test
+    public void testIncrementalSnapshotFailOnDirtyDir() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        IgniteEx srv = startGridsWithCache(
+            GRID_CNT,
+            CACHE_KEYS_RANGE,
+            key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+        );
+
+        snp(srv).createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        assertTrue(snp(srv).incrementalSnapshotsLocalRootDir(SNAPSHOT_NAME, null).mkdirs());
+        assertTrue(snp(srv).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).createNewFile());
+
+        assertThrows(
+            null,
+            () -> snp(srv).createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+            IgniteException.class,
+            "Can't create snapshot directory"
+        );
+
+        for (int i = 0; i < GRID_CNT; i++)
+            assertFalse(snp(srv).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).exists());
+    }
+
+    /** */
+    private void checkFailWhenCacheDestroyed(String cache2rvm, String errMsg) throws Exception {
+        IgniteEx srv = startGridsWithCache(
+            1,
+            CACHE_KEYS_RANGE,
+            key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME),
+            new CacheConfiguration<>(OTHER_CACHE),
+            new CacheConfiguration<Integer, Object>("my-grouped-cache1").setGroupName("mygroup"),
+            new CacheConfiguration<Integer, Object>(GROUPED_CACHE).setGroupName("mygroup")
+        );
+
+        IgniteSnapshotManager snpCreate = snp(srv);
+
+        snpCreate.createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        srv.destroyCache(cache2rvm);
+
+        assertThrows(
+            null,
+            () -> snpCreate.createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+            IgniteException.class,
+            errMsg
+        );
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/lang/GridFuncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/GridFuncSelfTest.java
new file mode 100644
index 00000000000..d25cb4e97bb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/lang/GridFuncSelfTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.lang;
+
+import java.util.TreeMap;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** */
+public class GridFuncSelfTest {
+    /** */
+    @Test
+    public void testMapEqNotOrdered() {
+        String str = "mystring";
+
+        TreeMap<String, Object> m1 = new TreeMap<>();
+
+        TreeMap<String, Object> m2 = new TreeMap<>();
+
+        m1.put("1", str);
+        m2.put("1", str);
+
+        m1.put("2", "2");
+        m2.put("3", "3");
+
+        assertFalse(F.eqNotOrdered(m1, m2));
+
+        m1.remove("2");
+        m2.remove("3");
+
+        m1.put("arr", new byte[] {1, 2, 3});
+        m2.put("arr", new byte[] {1, 2, 3});
+
+        assertTrue(F.eqNotOrdered(m1, m2));
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
index 7618ff25521..83689482659 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSn
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
+import org.apache.ignite.internal.processors.cache.persistence.snapshot.IncrementalSnapshotTest;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.PlainSnapshotTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -46,7 +47,8 @@ import org.junit.runners.Suite;
     IgniteSnapshotRestoreFromRemoteTest.class,
     PlainSnapshotTest.class,
     EncryptedSnapshotTest.class,
-    IgniteClusterSnapshotWalRecordTest.class
+    IgniteClusterSnapshotWalRecordTest.class,
+    IncrementalSnapshotTest.class
 })
 public class IgniteSnapshotTestSuite {
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index 5126a479aea..c003db0ea2d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.IgniteDevOnlyLogTest;
 import org.apache.ignite.internal.util.IgniteExceptionRegistrySelfTest;
 import org.apache.ignite.internal.util.IgniteUtilsSelfTest;
 import org.apache.ignite.internal.util.IgniteUtilsUnitTest;
+import org.apache.ignite.internal.util.lang.GridFuncSelfTest;
 import org.apache.ignite.internal.util.nio.GridNioDelimitedBufferSelfTest;
 import org.apache.ignite.internal.util.nio.GridNioSelfTest;
 import org.apache.ignite.internal.util.nio.GridNioServerTest;
@@ -109,6 +110,7 @@ import org.junit.runners.Suite;
     GridTransientTest.class,
     IgniteDevOnlyLogTest.class,
     GridConcurrentMultiPairQueueTest.class,
+    GridFuncSelfTest.class,
 
     // Sensitive toString.
     IncludeSensitiveAtomicTest.class,
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
index c1b37a48c3c..ef334798d3b 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassTest_help.output
@@ -140,12 +140,13 @@ This utility can do the following commands:
       snapshot_name  - Snapshot name.
 
   Create cluster snapshot:
-    control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync]
+    control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync] [--incremental]
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
       --dest path    - Path to the directory where the snapshot will be saved. If not specified, the default configured snapshot directory will be used.
       --sync         - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
+      --incremental  - Create an incremental snapshot for previously created full snapshot. Full snapshot must be accessible via --dest and snapshot_name.
 
   Cancel running snapshot operation:
     control.(sh|bat) --snapshot cancel --id id|--name name
@@ -158,14 +159,14 @@ This utility can do the following commands:
     control.(sh|bat) --snapshot check snapshot_name [--src path]
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
       --src path     - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
 
   Restore snapshot:
     control.(sh|bat) --snapshot restore snapshot_name [--groups group1,...groupN] [--src path] [--sync]
 
     Parameters:
-      snapshot_name              - Snapshot name.
+      snapshot_name              - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
       --groups group1,...groupN  - Cache group names.
       --src path                 - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
       --sync                     - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
@@ -174,13 +175,13 @@ This utility can do the following commands:
     control.(sh|bat) --snapshot restore snapshot_name --status
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
 
   Cancel snapshot restore operation (Command deprecated. Use '--snapshot cancel' instead):
     control.(sh|bat) --snapshot restore snapshot_name --cancel
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
 
   Get the status of the current snapshot operation:
     control.(sh|bat) --snapshot status
diff --git a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
index c1b37a48c3c..ef334798d3b 100644
--- a/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
+++ b/modules/core/src/test/resources/org.apache.ignite.util/GridCommandHandlerClusterByClassWithSSLTest_help.output
@@ -140,12 +140,13 @@ This utility can do the following commands:
       snapshot_name  - Snapshot name.
 
   Create cluster snapshot:
-    control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync]
+    control.(sh|bat) --snapshot create snapshot_name [--dest path] [--sync] [--incremental]
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
       --dest path    - Path to the directory where the snapshot will be saved. If not specified, the default configured snapshot directory will be used.
       --sync         - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
+      --incremental  - Create an incremental snapshot for previously created full snapshot. Full snapshot must be accessible via --dest and snapshot_name.
 
   Cancel running snapshot operation:
     control.(sh|bat) --snapshot cancel --id id|--name name
@@ -158,14 +159,14 @@ This utility can do the following commands:
     control.(sh|bat) --snapshot check snapshot_name [--src path]
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
       --src path     - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
 
   Restore snapshot:
     control.(sh|bat) --snapshot restore snapshot_name [--groups group1,...groupN] [--src path] [--sync]
 
     Parameters:
-      snapshot_name              - Snapshot name.
+      snapshot_name              - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
       --groups group1,...groupN  - Cache group names.
       --src path                 - Path to the directory where the snapshot files are located. If not specified, the default configured snapshot directory will be used.
       --sync                     - Run the operation synchronously, the command will wait for the entire operation to complete. Otherwise, it will be performed in the background, and the command will immediately return control.
@@ -174,13 +175,13 @@ This utility can do the following commands:
     control.(sh|bat) --snapshot restore snapshot_name --status
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
 
   Cancel snapshot restore operation (Command deprecated. Use '--snapshot cancel' instead):
     control.(sh|bat) --snapshot restore snapshot_name --cancel
 
     Parameters:
-      snapshot_name  - Snapshot name.
+      snapshot_name  - Snapshot name. In case incremental snapshot (--incremental) full snapshot name must be provided.
 
   Get the status of the current snapshot operation:
     control.(sh|bat) --snapshot status