You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/10/26 14:03:06 UTC

[ignite] branch IGNITE-17177_inc_snapshots updated: IGNITE-17613 Copy WAL segments in incremental snapshot (#10292)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch IGNITE-17177_inc_snapshots
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push:
     new 4f1a3033a28 IGNITE-17613 Copy WAL segments in incremental snapshot (#10292)
4f1a3033a28 is described below

commit 4f1a3033a284f9348221f134b32f79a50f1375ea
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Oct 26 17:02:58 2022 +0300

    IGNITE-17613 Copy WAL segments in incremental snapshot (#10292)
---
 .../pagemem/wal/IgniteWriteAheadLogManager.java    |  23 ++++
 .../snapshot/IgniteSnapshotManager.java            | 139 ++++++++++++++++-----
 .../snapshot/IncrementalSnapshotFutureTask.java    |  67 +++++++++-
 .../snapshot/IncrementalSnapshotMetadata.java      |   5 +
 .../snapshot/SnapshotHandlerRestoreTask.java       |   3 +-
 .../snapshot/SnapshotPartitionsVerifyTask.java     |   3 +-
 .../persistence/wal/FileWriteAheadLogManager.java  |  84 ++++++++-----
 .../wal/aware/SegmentArchivedStorage.java          |   4 +-
 .../cache/persistence/wal/aware/SegmentAware.java  |  14 ++-
 .../wal/aware/SegmentCompressStorage.java          |  21 ++++
 .../wal/aware/SegmentReservationStorage.java       |   4 +-
 .../cache/persistence/pagemem/NoOpWALManager.java  |  16 +++
 .../snapshot/IncrementalSnapshotTest.java          | 134 ++++++++++++++++++++
 13 files changed, 448 insertions(+), 69 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 98d99d613cc..832beef8bf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.pagemem.wal;
 
+import java.io.File;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
@@ -231,4 +233,25 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
      * Start automatically releasing segments when reaching {@link DataStorageConfiguration#getMaxWalArchiveSize()}.
      */
     void startAutoReleaseSegments();
+
+    /**
+     * Archive directory if any.
+     *
+     * @return Archive directory.
+     */
+    @Nullable File archiveDir();
+
+    /**
+     * @param idx Segment index.
+     * @return Compressed archive segment.
+     */
+    @Nullable File compactedSegment(long idx);
+
+    /**
+     * Blocks current thread while segment with the {@code idx} not compressed.
+     * If segment compressed, already, returns immediately.
+     *
+     * @param idx Segment index.
+     */
+    void awaitCompacted(long idx) throws IgniteInterruptedCheckedException;
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index cd894261b73..7be795ccf85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.BufferedInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -106,6 +105,9 @@ import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.managers.systemview.walker.SnapshotViewWalker;
 import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
@@ -132,6 +134,7 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPa
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
@@ -223,6 +226,7 @@ import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
 import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT;
 import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+import static org.apache.ignite.internal.util.io.GridFileUtils.ensureHardLinkAvailable;
 import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_SNAPSHOT;
 import static org.apache.ignite.spi.systemview.view.SnapshotView.SNAPSHOT_SYS_VIEW;
 import static org.apache.ignite.spi.systemview.view.SnapshotView.SNAPSHOT_SYS_VIEW_DESC;
@@ -814,12 +818,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         }
 
         if (req.incremental()) {
-            SnapshotMetadata meta = readSnapshotMetadata(new File(
-                snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
-                snapshotMetaFileName(cctx.localNode().consistentId().toString())
-            ));
+            SnapshotMetadata meta;
 
             try {
+                meta = readSnapshotMetadata(new File(
+                    snapshotLocalDir(req.snapshotName(), req.snapshotPath()),
+                    snapshotMetaFileName(cctx.localNode().consistentId().toString())
+                ));
+
                 checkIncrementalCanBeCreated(req.snapshotName(), req.snapshotPath(), meta);
             }
             catch (IgniteCheckedException | IOException e) {
@@ -842,6 +848,46 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         SnapshotMetadata meta
     ) {
         File incSnpDir = incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex());
+        WALPointer lowPtr, highPtr;
+
+        if (req.incrementIndex() == 1)
+            lowPtr = meta.snapshotRecordPointer();
+        else {
+            int prevIdx = req.incrementIndex() - 1;
+
+            IncrementalSnapshotMetadata prevIncSnpMeta;
+
+            try {
+                prevIncSnpMeta = readFromFile(new File(
+                    incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), prevIdx),
+                    incrementalSnapshotMetaFileName(prevIdx)
+                ));
+            }
+            catch (IgniteCheckedException | IOException e) {
+                return new GridFinishedFuture<>(e);
+            }
+
+            lowPtr = prevIncSnpMeta.cutPointer();
+        }
+
+        cctx.database().checkpointReadLock();
+
+        try {
+            highPtr = cctx.wal().log(new ClusterSnapshotRecord(req.snapshotName()));
+
+            // Dummy way to forcefully switch to the next segment.
+            // TODO: Must be replaced with the actual ConsistentCut logging.
+            cctx.wal().log(new ClusterSnapshotRecord(req.snapshotName()), RolloverType.NEXT_SEGMENT);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+        finally {
+            cctx.database().checkpointReadUnlock();
+        }
+
+        // For now, forcefully rollover to the next WAL segments to make segments waiting possible.
+        assert cctx.wal().currentSegment() >= highPtr.index() : "Rollover must be invoked.";
 
         IgniteInternalFuture<SnapshotOperationResponse> task0 = registerTask(req.snapshotName(), new IncrementalSnapshotFutureTask(
             cctx,
@@ -851,7 +897,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
             req.snapshotPath(),
             req.incrementIndex(),
             tmpWorkDir,
-            ioFactory
+            ioFactory,
+            lowPtr,
+            highPtr
         )).chain(fut -> {
             if (fut.error() != null)
                 throw F.wrap(fut.error());
@@ -864,7 +912,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 req.incrementIndex(),
                 cctx.localNode().consistentId().toString(),
                 pdsSettings.folderName(),
-                null /* WAL Pointer for CUT record goes here. */
+                highPtr
             );
 
             writeSnapshotMetafile(
@@ -1600,7 +1648,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
      * @param consId Node consistent id to read metadata for.
      * @return Snapshot metadata instance.
      */
-    public SnapshotMetadata readSnapshotMetadata(File snpDir, String consId) {
+    public SnapshotMetadata readSnapshotMetadata(File snpDir, String consId) throws IgniteCheckedException, IOException {
         return readSnapshotMetadata(new File(snpDir, snapshotMetaFileName(consId)));
     }
 
@@ -1608,26 +1656,31 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
      * @param smf File denoting to snapshot metafile.
      * @return Snapshot metadata instance.
      */
-    private SnapshotMetadata readSnapshotMetadata(File smf) {
-        if (!smf.exists())
-            throw new IgniteException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+    private SnapshotMetadata readSnapshotMetadata(File smf) throws IgniteCheckedException, IOException {
+        SnapshotMetadata meta = readFromFile(smf);
 
         String smfName = smf.getName().substring(0, smf.getName().length() - SNAPSHOT_METAFILE_EXT.length());
 
-        try (InputStream in = new BufferedInputStream(new FileInputStream(smf))) {
-            SnapshotMetadata meta = marsh.unmarshal(in, U.resolveClassLoader(cctx.gridConfig()));
+        if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
+            throw new IgniteException(
+                "Error reading snapshot metadata [smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId())
+            );
+        }
 
-            if (!U.maskForFileName(meta.consistentId()).equals(smfName)) {
-                throw new IgniteException(
-                    "Error reading snapshot metadata [smfName=" + smfName + ", consId=" + U.maskForFileName(meta.consistentId())
-                );
-            }
+        return meta;
+    }
 
-            return meta;
-        }
-        catch (IgniteCheckedException | IOException e) {
-            throw new IgniteException("An error occurred during reading snapshot metadata file [file=" +
-                smf.getAbsolutePath() + "]", e);
+    /**
+     * @param smf File to read.
+     * @return Read metadata.
+     * @param <T> Type of metadata.
+     */
+    private <T> T readFromFile(File smf) throws IgniteCheckedException, IOException {
+        if (!smf.exists())
+            throw new IgniteCheckedException("Snapshot metafile cannot be read due to it doesn't exist: " + smf);
+
+        try (InputStream in = new BufferedInputStream(Files.newInputStream(smf.toPath()))) {
+            return marsh.unmarshal(in, U.resolveClassLoader(cctx.gridConfig()));
         }
     }
 
@@ -1665,15 +1718,20 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
         Map<String, SnapshotMetadata> metasMap = new HashMap<>();
         SnapshotMetadata prev = null;
 
-        for (File smf : smfs) {
-            SnapshotMetadata curr = readSnapshotMetadata(smf);
+        try {
+            for (File smf : smfs) {
+                SnapshotMetadata curr = readSnapshotMetadata(smf);
 
-            if (prev != null && !prev.sameSnapshot(curr))
-                throw new IgniteException("Snapshot metadata files are from different snapshots [prev=" + prev + ", curr=" + curr);
+                if (prev != null && !prev.sameSnapshot(curr))
+                    throw new IgniteException("Snapshot metadata files are from different snapshots [prev=" + prev + ", curr=" + curr);
 
-            metasMap.put(curr.consistentId(), curr);
+                metasMap.put(curr.consistentId(), curr);
 
-            prev = curr;
+                prev = curr;
+            }
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
         }
 
         SnapshotMetadata currNodeSmf = metasMap.remove(cctx.localNode().consistentId().toString());
@@ -1767,6 +1825,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
                 }
 
                 if (incremental) {
+                    if (!cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()) {
+                        throw new IgniteException("Create incremental snapshot request has been rejected. " +
+                            "WAL compaction must be enabled.");
+                    }
+
                     if (!snpExists) {
                         throw new IgniteException("Create incremental snapshot request has been rejected. " +
                                 "Base snapshot with given name doesn't exist on local node.");
@@ -2443,6 +2506,24 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
     ) throws IgniteCheckedException, IOException {
         File snpDir = snapshotLocalDir(name, snpPath);
 
+        IgniteWriteAheadLogManager wal = cctx.wal();
+
+        if (wal == null) {
+            throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+                "WAL must be eanbled."
+            );
+        }
+
+        File archiveDir = wal.archiveDir();
+
+        if (archiveDir == null) {
+            throw new IgniteCheckedException("Create incremental snapshot request has been rejected. " +
+                "WAL archive must be eanbled."
+            );
+        }
+
+        ensureHardLinkAvailable(archiveDir.toPath(), snpDir.toPath());
+
         Set<String> aliveNodesConsIds = cctx.discovery().aliveServerNodes()
             .stream()
             .map(node -> node.consistentId().toString())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
index aa9e2208c05..2bbaad1d4c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -18,14 +18,18 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiConsumer;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.jetbrains.annotations.Nullable;
 
@@ -42,16 +46,28 @@ class IncrementalSnapshotFutureTask
     /** Metadata of the full snapshot. */
     private final Set<Integer> affectedCacheGrps;
 
+    /**
+     * Pointer to the previous snapshot record.
+     * In case first increment snapshot will point to the {@link ClusterSnapshotRecord}.
+     * For second and subsequent incements on the previous consistent cut record.
+     */
+    private final WALPointer lowPtr;
+
+    /** Current consistent cut WAL pointer. */
+    private final WALPointer highPtr;
+
     /** */
     public IncrementalSnapshotFutureTask(
         GridCacheSharedContext<?, ?> cctx,
         UUID srcNodeId,
         UUID reqNodeId,
         SnapshotMetadata meta,
-        String snpPath,
+        @Nullable String snpPath,
         int incIdx,
         File tmpWorkDir,
-        FileIOFactory ioFactory
+        FileIOFactory ioFactory,
+        WALPointer lowPtr,
+        WALPointer highPtr
     ) {
         super(
             cctx,
@@ -82,6 +98,8 @@ class IncrementalSnapshotFutureTask
         this.incIdx = incIdx;
         this.snpPath = snpPath;
         this.affectedCacheGrps = new HashSet<>(meta.cacheGroupIds());
+        this.lowPtr = lowPtr;
+        this.highPtr = highPtr;
 
         cctx.cache().configManager().addConfigurationChangeListener(this);
     }
@@ -102,7 +120,50 @@ class IncrementalSnapshotFutureTask
                 return false;
             }
 
-            onDone(new IncrementalSnapshotFutureTaskResult());
+            cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> {
+                try {
+                    // First increment must include low segment, because full snapshot knows nothing about WAL.
+                    // All other begins from the next segment because lowPtr already saved inside previous increment.
+                    long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1);
+                    long highIdx = highPtr.index();
+
+                    assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled()
+                        : "WAL Compaction must be enabled";
+                    assert lowIdx <= highIdx;
+
+                    if (log.isInfoEnabled())
+                        log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']');
+
+                    cctx.wal().awaitCompacted(highPtr.index());
+
+                    if (log.isInfoEnabled()) {
+                        log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " +
+                            "highIdx=" + highIdx + ']');
+                    }
+
+                    for (; lowIdx <= highIdx; lowIdx++) {
+                        File seg = cctx.wal().compactedSegment(lowIdx);
+
+                        if (!seg.exists()) {
+                            onDone(new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']'));
+
+                            return;
+                        }
+
+                        Path segLink = incSnpDir.toPath().resolve(seg.getName());
+
+                        if (log.isDebugEnabled())
+                            log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']');
+
+                        Files.createLink(segLink, seg.toPath());
+                    }
+
+                    onDone(new IncrementalSnapshotFutureTaskResult());
+                }
+                catch (Throwable e) {
+                    onDone(e);
+                }
+            });
 
             return true;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
index c262b070d82..3f4abe5d02d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMetadata.java
@@ -75,6 +75,11 @@ public class IncrementalSnapshotMetadata implements Serializable {
         this.cutPtr = cutPtr;
     }
 
+    /** @return Pointer to consistent cut record. */
+    public WALPointer cutPointer() {
+        return cutPtr;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IncrementalSnapshotMetadata.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
index 947085ab87f..79d019b6985 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -144,7 +145,7 @@ public class SnapshotHandlerRestoreTask extends AbstractSnapshotVerificationTask
                 return snpMgr.handlers().invokeAll(SnapshotHandlerType.RESTORE,
                     new SnapshotHandlerContext(meta, grps, ignite.localNode(), snpDir));
             }
-            catch (IgniteCheckedException e) {
+            catch (IgniteCheckedException | IOException e) {
                 throw new IgniteException(e);
             }
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
index 05f5856b903..29219920fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTask.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -119,7 +120,7 @@ public class SnapshotPartitionsVerifyTask extends AbstractSnapshotVerificationTa
                 return new SnapshotPartitionsVerifyHandler(cctx)
                     .invoke(new SnapshotHandlerContext(meta, rqGrps, ignite.localNode(), snpDir));
             }
-            catch (IgniteCheckedException e) {
+            catch (IgniteCheckedException | IOException e) {
                 throw new IgniteException(e);
             }
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 51a1b88b105..d92895ceaae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -634,6 +634,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath()));
     }
 
+    /** {@inheritDoc} */
+    @Override public @Nullable File archiveDir() {
+        return walArchiveDir;
+    }
+
     /**
      * Collects WAL segments from the archive only if they are all present.
      * Will wait for the last segment to be archived if it is not.
@@ -654,10 +659,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         List<File> res = new ArrayList<>();
 
         for (long i = low.index(); i < high.index(); i++) {
-            String segmentName = fileName(i);
-
-            File file = new File(walArchiveDir, segmentName);
-            File fileZip = new File(walArchiveDir, segmentName + ZIP_SUFFIX);
+            File file = archiveSegment(i, null);
+            File fileZip = compactedSegment(i);
 
             if (file.exists())
                 res.add(file);
@@ -1101,10 +1104,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @return {@code True} exists.
      */
     private boolean hasIndex(long absIdx) {
-        String segmentName = fileName(absIdx);
-
-        boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
-            new File(walArchiveDir, segmentName + ZIP_SUFFIX).exists();
+        boolean inArchive = archiveSegment(absIdx, null).exists() ||
+            compactedSegment(absIdx).exists();
 
         if (inArchive)
             return true;
@@ -2063,11 +2064,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             File origFile = new File(walWorkDir, fileName(segIdx));
 
-            String name = fileName(absIdx);
-
-            File dstTmpFile = new File(walArchiveDir, name + TMP_SUFFIX);
-
-            File dstFile = new File(walArchiveDir, name);
+            File dstTmpFile = FileWriteAheadLogManager.this.archiveSegment(absIdx, TMP_SUFFIX);
+            File dstFile = FileWriteAheadLogManager.this.archiveSegment(absIdx, null);
 
             if (log.isInfoEnabled()) {
                 log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx +
@@ -2314,13 +2312,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     if ((segIdx = tryReserveNextSegmentOrWait()) == -1)
                         continue;
 
-                    String segmentFileName = fileName(segIdx);
-
-                    File tmpZip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX + TMP_SUFFIX);
-
-                    File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX);
-
-                    File raw = new File(walArchiveDir, segmentFileName);
+                    File tmpZip = archiveSegment(segIdx, ZIP_SUFFIX + TMP_SUFFIX);
+                    File zip = compactedSegment(segIdx);
+                    File raw = archiveSegment(segIdx, null);
 
                     long currSize = 0;
                     long reservedSize = raw.length();
@@ -2475,12 +2469,45 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 if (segmentReservedOrLocked(desc.idx))
                     return;
 
-                if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx))
-                    segmentAware.addSize(desc.idx, -deleteArchiveFiles(desc.file));
+                if (desc.idx < lastCheckpointPtr.index() && duplicateIndices.contains(desc.idx)) {
+                    long sz = deleteArchiveFiles(desc.file);
+
+                    segmentAware.addSize(desc.idx, -sz);
+                }
             }
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public File compactedSegment(long idx) {
+        return archiveSegment(idx, ZIP_SUFFIX);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitCompacted(long idx) throws IgniteInterruptedCheckedException {
+        segmentAware.awaitSegmentCompressed(idx);
+    }
+
+    /** */
+    private File archiveSegment(long idx, @Nullable String ext) {
+        return archiveSegment(walArchiveDir, idx, ext);
+    }
+
+    /**
+     * @param walArchiveDir WAL archive directory.
+     * @param idx Segment index.
+     * @param ext Optional extension
+     * @return Path to archive segment.
+     */
+    public static File archiveSegment(File walArchiveDir, long idx, String ext) {
+        String fileName = fileName(idx);
+
+        if (ext != null)
+            fileName += ext;
+
+        return new File(walArchiveDir, fileName);
+    }
+
     /**
      * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay.
      */
@@ -2525,11 +2552,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     if (segmentToDecompress == -1)
                         continue;
 
-                    String segmentFileName = fileName(segmentToDecompress);
-
-                    File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX);
-                    File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX);
-                    File unzip = new File(walArchiveDir, segmentFileName);
+                    File zip = compactedSegment(segmentToDecompress);
+                    File unzipTmp = archiveSegment(segmentToDecompress, TMP_SUFFIX);
+                    File unzip = archiveSegment(segmentToDecompress, null);
 
                     long currSize = 0;
                     long reservedSize = U.uncompressedSize(zip);
@@ -2606,7 +2631,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             if (decompressionFutures.containsKey(idx))
                 return decompressionFutures.get(idx);
 
-            File f = new File(walArchiveDir, fileName(idx));
+            File f = archiveSegment(idx, null);
 
             if (f.exists())
                 return new GridFinishedFuture<>();
@@ -2882,8 +2907,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             AbstractFileDescriptor currDesc = desc;
 
             if (!desc.file().exists()) {
-                FileDescriptor zipFile = new FileDescriptor(
-                    new File(walArchiveDir, fileName(desc.idx()) + ZIP_SUFFIX));
+                FileDescriptor zipFile = new FileDescriptor(archiveSegment(walArchiveDir, desc.idx(), ZIP_SUFFIX));
 
                 if (!zipFile.file.exists()) {
                     throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " +
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index 12a1e5f2fdc..3ca1ab82a73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -65,9 +65,9 @@ class SegmentArchivedStorage extends SegmentObservable {
     }
 
     /**
-     * Method will wait activation of particular WAL segment index.
+     * Method will wait archivation of particular WAL segment index.
      *
-     * @param awaitIdx absolute index  {@link #lastArchivedAbsoluteIndex()} to become true.
+     * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
      * @throws IgniteInterruptedCheckedException if interrupted.
      */
     synchronized void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 728bd7411e5..e4131098e7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -130,15 +130,25 @@ public class SegmentAware {
     }
 
     /**
-     * Method will wait activation of particular WAL segment index.
+     * Method will wait archivation of particular WAL segment index.
      *
-     * @param awaitIdx absolute index  {@link #lastArchivedAbsoluteIndex()} to become true.
+     * @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
      * @throws IgniteInterruptedCheckedException if interrupted.
      */
     public void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException {
         segmentArchivedStorage.awaitSegmentArchived(awaitIdx);
     }
 
+    /**
+     * Method will wait activation of particular WAL segment index.
+     *
+     * @param awaitIdx absolute index {@link #lastCompressedIdx()} to become true.
+     * @throws IgniteInterruptedCheckedException if interrupted.
+     */
+    public void awaitSegmentCompressed(long awaitIdx) throws IgniteInterruptedCheckedException {
+        segmentCompressStorage.awaitSegmentCompressed(awaitIdx);
+    }
+
     /**
      * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. Waits if
      * there's no segment to archive right now.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 2272c0ec123..85a9257f0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -79,10 +79,31 @@ class SegmentCompressStorage {
         else
             this.lastCompressedIdx = lastMaxCompressedIdx;
 
+        notifyAll();
+
         if (compressedIdx > lastEnqueuedToCompressIdx)
             lastEnqueuedToCompressIdx = compressedIdx;
     }
 
+    /**
+     * Method will wait activation of particular WAL segment index.
+     *
+     * @param awaitIdx absolute index {@link #lastCompressedIdx()}} to become true.
+     * @throws IgniteInterruptedCheckedException if interrupted.
+     */
+    public synchronized void awaitSegmentCompressed(long awaitIdx) throws IgniteInterruptedCheckedException {
+        while (lastCompressedIdx() < awaitIdx && !interrupted) {
+            try {
+                wait(2000);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(e);
+            }
+        }
+
+        checkInterrupted();
+    }
+
     /**
      * @return Last compressed segment.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
index ee81bff4b34..adc4fbfe670 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
@@ -122,7 +122,9 @@ class SegmentReservationStorage extends SegmentObservable {
         Long oldMin = oldMinE == null ? null : oldMinE.getKey();
         Long newMin = newMinE == null ? null : newMinE.getKey();
 
-        return Objects.equals(oldMin, newMin) ? null : newMin == null ? -1 : newMin;
+        return Objects.equals(oldMin, newMin)
+            ? null
+            : newMin == null ? -1 : newMin;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index eccef97f56c..a2863fc70df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
+import java.io.File;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -201,4 +202,19 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
     @Override public void startAutoReleaseSegments() {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable File archiveDir() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable File compactedSegment(long idx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitCompacted(long idx) {
+        // No-op.
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
index 340955a3ae6..121a57be913 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java
@@ -19,14 +19,19 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.UnaryOperator;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -34,6 +39,7 @@ import org.junit.Test;
 
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.junit.Assume.assumeFalse;
 
 /**
@@ -49,6 +55,23 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
     /** */
     public static final String GROUPED_CACHE = "my-grouped-cache2";
 
+    /** @see DataStorageConfiguration#isWalCompactionEnabled() */
+    public boolean walCompactionEnabled = true;
+
+    /** */
+    private AtomicInteger cntr = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getDataStorageConfiguration()
+            .setWalCompactionEnabled(walCompactionEnabled)
+            .setWalSegmentSize((int)U.MB);
+
+        return cfg;
+    }
+
     /** */
     @Test
     public void testCreation() throws Exception {
@@ -88,6 +111,8 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
                     snpCreate.createSnapshot(snpName, snpPath.getAbsolutePath(), false).get(TIMEOUT);
 
                 for (int incIdx = 1; incIdx < 3; incIdx++) {
+                    addData(cli);
+
                     if (snpPath == null)
                         snpCreate.createIncrementalSnapshot(snpName).get(TIMEOUT);
                     else
@@ -106,6 +131,19 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
         }
     }
 
+    /** */
+    private void addData(IgniteEx node) {
+        IgniteCache<Integer, byte[]> cache = node.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < (int)(U.MB * 3 * GRID_CNT / U.KB); i++) {
+            byte[] bytes = new byte[(int)U.KB];
+
+            ThreadLocalRandom.current().nextBytes(bytes);
+
+            cache.put(cntr.incrementAndGet(), bytes);
+        }
+    }
+
     /** */
     @Test
     public void testFailForUnknownBaseSnapshot() throws Exception {
@@ -127,6 +165,72 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
         );
     }
 
+    /** */
+    @Test
+    public void testFailIfPreviousIncrementNotAvailable() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        IgniteEx srv = startGridsWithCache(
+            GRID_CNT,
+            CACHE_KEYS_RANGE,
+            key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+        );
+
+        IgniteEx cli = startClientGrid(
+            GRID_CNT,
+            (UnaryOperator<IgniteConfiguration>)
+                cfg -> cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
+        );
+
+        cli.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+        cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        File toRmv = snp(ignite(GRID_CNT - 1)).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 2);
+
+        assertTrue(toRmv.exists());
+        assertTrue(toRmv.isDirectory());
+
+        U.delete(toRmv);
+
+        assertThrowsWithCause(
+            () -> cli.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+            IgniteException.class
+        );
+    }
+
+    /** */
+    @Test
+    public void testFailIfSegmentNotFound() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        IgniteEx srv = startGridsWithCache(
+            1,
+            CACHE_KEYS_RANGE,
+            key -> new Account(key, key),
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+        );
+
+        srv.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+        addData(srv);
+
+        FileWriteAheadLogManager wal = (FileWriteAheadLogManager)srv.context().cache().context().wal();
+
+        assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 0, getTestTimeout()));
+
+        long segIdx = wal.lastCompactedSegment();
+
+        U.delete(wal.compactedSegment(segIdx));
+
+        assertThrowsWithCause(
+            () -> srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+            IgniteException.class
+        );
+    }
+
     /** */
     @Test
     public void testIncrementalSnapshotFailsOnTopologyChange() throws Exception {
@@ -234,6 +338,36 @@ public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
             assertFalse(snp(srv).incrementalSnapshotLocalDir(SNAPSHOT_NAME, null, 1).exists());
     }
 
+    /** */
+    @Test
+    public void testFailIfWalCompactionDisabled() throws Exception {
+        assumeFalse("https://issues.apache.org/jira/browse/IGNITE-17819", encryption);
+
+        walCompactionEnabled = false;
+
+        try {
+            IgniteEx srv = startGridsWithCache(
+                1,
+                CACHE_KEYS_RANGE,
+                key -> new Account(key, key),
+                new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            );
+
+            srv.snapshot().createSnapshot(SNAPSHOT_NAME).get(TIMEOUT);
+
+            assertThrows(
+                null,
+                () -> srv.snapshot().createIncrementalSnapshot(SNAPSHOT_NAME).get(TIMEOUT),
+                IgniteException.class,
+                "Create incremental snapshot request has been rejected. WAL compaction must be enabled."
+            );
+        }
+        finally {
+            walCompactionEnabled = true;
+        }
+
+    }
+
     /** */
     private void checkFailWhenCacheDestroyed(String cache2rvm, String errMsg) throws Exception {
         IgniteEx srv = startGridsWithCache(