You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/02/10 07:09:05 UTC

[ignite] branch master updated: IGNITE-13877 Restructuring WAL work directory after enabling WAL archive. - Fixes #8681.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new defc398  IGNITE-13877 Restructuring WAL work directory after enabling WAL archive. - Fixes #8681.
defc398 is described below

commit defc39832e53b4032ba2883fbdaed77228c99ac2
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed Feb 10 09:48:14 2021 +0300

    IGNITE-13877 Restructuring WAL work directory after enabling WAL archive. - Fixes #8681.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../persistence/wal/FileWriteAheadLogManager.java  | 228 +++++++++++++---
 .../persistence/wal/WalArchiveConsistencyTest.java | 294 +++++++++++++++++++++
 .../testframework/junits/GridAbstractTest.java     |  16 ++
 .../ignite/testsuites/IgnitePdsTestSuite.java      |   3 +
 4 files changed, 508 insertions(+), 33 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a7eee3c..a22ef7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 import java.util.zip.ZipOutputStream;
@@ -127,9 +126,12 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
@@ -216,13 +218,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, DFLT_WAL_MMAP);
 
     /**
-     * Percentage of WAL archive size to calculate threshold since which removing of old archive should be started.
-     */
-    private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE =
-        IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE,
-            DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
-
-    /**
      * Number of WAL compressor worker threads.
      */
     private final int WAL_COMPRESSOR_WORKER_THREAD_CNT =
@@ -407,7 +402,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         segmentFileInputFactory = new SimpleSegmentFileInputFactory();
         walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
 
-        allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+        double thresholdWalArchiveSizePercentage = IgniteSystemProperties.getDouble(
+            IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
+
+        allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * thresholdWalArchiveSizePercentage);
 
         evt = ctx.event();
         failureProcessor = ctx.failure();
@@ -462,8 +460,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             metrics = dbMgr.persistentStoreMetricsImpl();
 
-            checkOrPrepareFiles();
-
             if (metrics != null) {
                 metrics.setWalSizeProvider(new CO<Long>() {
                     /** {@inheritDoc} */
@@ -492,11 +488,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             }
 
             if (isArchiverEnabled())
-                archiver = new FileArchiver(segmentAware, log);
+                archiver = new FileArchiver(log);
 
             if (!walArchiveUnlimited())
                 cleaner = new FileCleaner(log);
 
+            prepareAndCheckWalFiles();
+
+            if (compressor != null)
+                compressor.initAlreadyCompressedSegments();
+
+            if (archiver != null)
+                archiver.init(segmentAware);
+
             segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg);
 
             fileHandleManager = fileHandleManagerFactory.build(
@@ -1502,11 +1506,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * Deletes temp files creates and prepares new; Creates the first segment if necessary.
+     * Prepare and check WAL files.
      *
      * @throws StorageException If failed.
      */
-    private void checkOrPrepareFiles() throws StorageException {
+    private void prepareAndCheckWalFiles() throws StorageException {
         Collection<File> tmpFiles = new HashSet<>();
 
         for (File walDir : F.asList(walWorkDir, walArchiveDir)) {
@@ -1521,21 +1525,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             }
         }
 
-        File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
+        if (F.isEmpty(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER)))
+            createFile(new File(walWorkDir, fileName(0)));
 
-        if (isArchiverEnabled() && !F.isEmpty(allFiles) && allFiles.length > dsCfg.getWalSegments()) {
-            throw new StorageException("Failed to initialize wal (work directory contains incorrect " +
-                "number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
-        }
+        if (isArchiverEnabled()) {
+            moveSegmentsToArchive();
 
-        // Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
-        if (F.isEmpty(allFiles)) {
-            File first = new File(walWorkDir, fileName(0));
+            renameLastSegment();
+
+            formatWorkSegments();
 
-            createFile(first);
-        }
-        else if (isArchiverEnabled())
             checkFiles(0, false, null, null);
+        }
     }
 
     /**
@@ -1724,14 +1725,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /**
          * Constructor.
          *
-         * @param segmentAware Segment aware.
          * @param log Logger.
          */
-        private FileArchiver(SegmentAware segmentAware, IgniteLogger log) throws IgniteCheckedException {
+        private FileArchiver(IgniteLogger log) {
             super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log,
                 cctx.kernalContext().workersRegistry());
-
-            init(segmentAware);
         }
 
         /**
@@ -2048,7 +2046,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         /**
          * Background creation of all segments except first. First segment was created in main thread by {@link
-         * FileWriteAheadLogManager#checkOrPrepareFiles()}
+         * FileWriteAheadLogManager#prepareAndCheckWalFiles()}
          */
         private void allocateRemainingFiles() throws StorageException {
             checkFiles(
@@ -2092,8 +2090,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          */
         FileCompressor(IgniteLogger log) {
             super(0, log);
-
-            initAlreadyCompressedSegments();
         }
 
         /** */
@@ -2936,7 +2932,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             if (files == null)
                 return Collections.emptyList();
 
-            return Arrays.stream(files).map(File::getName).sorted().collect(Collectors.toList());
+            return Arrays.stream(files).map(File::getName).sorted().collect(toList());
         }
 
         /** {@inheritDoc} */
@@ -3252,4 +3248,170 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             new IgniteThread(this).start();
         }
     }
+
+    /**
+     * Moving working segments to archive, if segments are more than {@link DataStorageConfiguration#getWalSegments()}
+     * or index of first segment is not 0. All segments will be moved except for last one,
+     * as well as all compressed segments.
+     *
+     * @throws StorageException If an error occurs while moving.
+     */
+    private void moveSegmentsToArchive() throws StorageException {
+        assert isArchiverEnabled();
+
+        FileDescriptor[] workSegments = scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+
+        List<FileDescriptor> toMove = new ArrayList<>();
+
+        if (!F.isEmpty(workSegments) && (workSegments.length > dsCfg.getWalSegments() || workSegments[0].idx() != 0))
+            toMove.addAll(F.asList(workSegments).subList(0, workSegments.length - 1));
+
+        toMove.addAll(F.asList(scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER))));
+
+        if (!toMove.isEmpty()) {
+            log.warning("Content of WAL working directory needs rearrangement, some WAL segments will be moved to " +
+                "archive: " + walArchiveDir.getAbsolutePath() + ". Segments from " + toMove.get(0).file().getName() +
+                " to " + toMove.get(toMove.size() - 1).file().getName() + " will be moved, total number of files: " +
+                toMove.size() + ". This operation may take some time.");
+
+            for (int i = 0, j = 0; i < toMove.size(); i++) {
+                FileDescriptor fd = toMove.get(i);
+
+                File tmpDst = new File(walArchiveDir, fd.file().getName() + TMP_SUFFIX);
+                File dst = new File(walArchiveDir, fd.file().getName());
+
+                try {
+                    Files.copy(fd.file().toPath(), tmpDst.toPath());
+
+                    Files.move(tmpDst.toPath(), dst.toPath());
+
+                    Files.delete(fd.file().toPath());
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("WAL segment moved [src=" + fd.file().getAbsolutePath() +
+                            ", dst=" + dst.getAbsolutePath() + ']');
+                    }
+
+                    // Batch output.
+                    if (log.isInfoEnabled() && (i == toMove.size() - 1 || (i != 0 && i % 9 == 0))) {
+                        log.info("WAL segments moved: " + toMove.get(j).file().getName() +
+                            (i == j ? "" : " - " + toMove.get(i).file().getName()));
+
+                        j = i + 1;
+                    }
+                }
+                catch (IOException e) {
+                    throw new StorageException("Failed to move WAL segment [src=" + fd.file().getAbsolutePath() +
+                        ", dst=" + dst.getAbsolutePath() + ']', e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Renaming last segment if it is only one and its index is greater than {@link DataStorageConfiguration#getWalSegments()}.
+     *
+     * @throws StorageException If an error occurs while renaming.
+     */
+    private void renameLastSegment() throws StorageException {
+        assert isArchiverEnabled();
+
+        FileDescriptor[] workSegments = scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER));
+
+        if (workSegments.length == 1 && workSegments[0].idx() != workSegments[0].idx() % dsCfg.getWalSegments()) {
+            FileDescriptor toRen = workSegments[0];
+
+            if (log.isInfoEnabled()) {
+                log.info("Last WAL segment file has to be renamed from " + toRen.file().getName() + " to " +
+                    fileName(toRen.idx() % dsCfg.getWalSegments()) + '.');
+            }
+
+            String toRenFileName = fileName(toRen.idx() % dsCfg.getWalSegments());
+
+            File tmpDst = new File(walWorkDir, toRenFileName + TMP_SUFFIX);
+            File dst = new File(walWorkDir, toRenFileName);
+
+            try {
+                Files.copy(toRen.file().toPath(), tmpDst.toPath());
+
+                Files.move(tmpDst.toPath(), dst.toPath());
+
+                Files.delete(toRen.file().toPath());
+
+                if (log.isInfoEnabled()) {
+                    log.info("WAL segment renamed [src=" + toRen.file().getAbsolutePath() +
+                        ", dst=" + dst.getAbsolutePath() + ']');
+                }
+            }
+            catch (IOException e) {
+                throw new StorageException("Failed to rename WAL segment [src=" +
+                    toRen.file().getAbsolutePath() + ", dst=" + dst.getAbsolutePath() + ']', e);
+            }
+        }
+    }
+
+    /**
+     * Formatting working segments to {@link DataStorageConfiguration#getWalSegmentSize()} for work in a mmap or fsync case.
+     *
+     * @throws StorageException If an error occurs when formatting.
+     */
+    private void formatWorkSegments() throws StorageException {
+        assert isArchiverEnabled();
+
+        if (mode == WALMode.FSYNC || mmap) {
+            List<FileDescriptor> toFormat = Arrays.stream(scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER)))
+                .filter(fd -> fd.file().length() < dsCfg.getWalSegmentSize()).collect(toList());
+
+            if (!toFormat.isEmpty()) {
+                if (log.isInfoEnabled()) {
+                    log.info("WAL segments in working directory should have the same size: '" +
+                        U.humanReadableByteCount(dsCfg.getWalSegmentSize()) + "'. Segments that need reformat " +
+                        "found: " + F.viewReadOnly(toFormat, fd -> fd.file().getName()) + '.');
+                }
+
+                for (int i = 0, j = 0; i < toFormat.size(); i++) {
+                    FileDescriptor fd = toFormat.get(i);
+
+                    File tmpDst = new File(fd.file().getName() + TMP_SUFFIX);
+
+                    try {
+                        Files.copy(fd.file().toPath(), tmpDst.toPath());
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Start formatting WAL segment [filePath=" + tmpDst.getAbsolutePath() +
+                                ", fileSize=" + U.humanReadableByteCount(tmpDst.length()) +
+                                ", toSize=" + U.humanReadableByteCount(dsCfg.getWalSegmentSize()) + ']');
+                        }
+
+                        try (FileIO fileIO = ioFactory.create(tmpDst, CREATE, READ, WRITE)) {
+                            int left = (int)(dsCfg.getWalSegmentSize() - tmpDst.length());
+
+                            fileIO.position(tmpDst.length());
+
+                            while (left > 0)
+                                left -= fileIO.writeFully(FILL_BUF, 0, Math.min(FILL_BUF.length, left));
+
+                            fileIO.force();
+                        }
+
+                        Files.move(tmpDst.toPath(), fd.file().toPath(), REPLACE_EXISTING, ATOMIC_MOVE);
+
+                        if (log.isDebugEnabled())
+                            log.debug("WAL segment formatted: " + fd.file().getAbsolutePath());
+
+                        // Batch output.
+                        if (log.isInfoEnabled() && (i == toFormat.size() - 1 || (i != 0 && i % 9 == 0))) {
+                            log.info("WAL segments formatted: " + toFormat.get(j).file().getName() +
+                                (i == j ? "" : " - " + fileName(i)));
+
+                            j = i + 1;
+                        }
+                    }
+                    catch (IOException e) {
+                        throw new StorageException("Failed to format WAL segment: " + fd.file().getAbsolutePath(), e);
+                    }
+                }
+            }
+        }
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
new file mode 100644
index 0000000..43a26c2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Class for testing cases when WAL archive configuration was changed and the node was able to start.
+ */
+@RunWith(Parameterized.class)
+@WithSystemProperty(key = IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, value = "0.0")
+public class WalArchiveConsistencyTest extends GridCommonAbstractTest {
+    /**
+     * WAL mode.
+     */
+    @Parameterized.Parameter
+    public WALMode walMode;
+
+    /**
+     * @return Test parameters.
+     */
+    @Parameterized.Parameters(name = "walMode={0}")
+    public static Collection<Object[]> parameters() {
+        return Arrays.asList(
+            new Object[] {WALMode.LOG_ONLY},
+            new Object[] {WALMode.FSYNC}
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
+            .setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setWalSegments(10)
+                    .setWalSegmentSize((int)U.MB)
+                    .setMaxWalArchiveSize(10 * U.MB)
+                    .setWalMode(walMode)
+                    .setWalFsyncDelayNanos(100)
+                    .setDefaultDataRegionConfiguration(
+                        new DataRegionConfiguration()
+                            .setPersistenceEnabled(true)
+                            .setMaxSize(U.GB)
+                    )
+            );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteEx startGrid(int idx, Consumer<IgniteConfiguration> cfgOp) throws Exception {
+        IgniteEx n = super.startGrid(idx, cfgOp);
+
+        n.cluster().state(ClusterState.ACTIVE);
+        awaitPartitionMapExchange();
+
+        return n;
+    }
+
+    /**
+     * Verify that when switching WAL archive off -> on and increasing the
+     * number of WAL segments on restarting the node, the recovery will be consistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIncreaseWalSegmentsWithoutTruncate() throws Exception {
+        checkRecoveryWithoutWalTruncate(12);
+    }
+
+    /**
+     * Verify that when switching WAL archive off -> on and decreasing the
+     * number of WAL segments on restarting the node, the recovery will be consistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDecreaseWalSegmentsWithoutTruncate() throws Exception {
+        checkRecoveryWithoutWalTruncate(4);
+    }
+
+    /**
+     * Checking that when switching WAL archive off -> on,
+     * reducing WAL segments at the start of the node
+     * and truncation some WAL segments, the recovery will be consistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDecreaseWalSegmentsWithTruncate0() throws Exception {
+        checkRecoveryWithWalTruncate(5);
+    }
+
+    /**
+     * Checking that when switching WAL archive off -> on,
+     * reducing WAL segments at the start of the node
+     * and truncation some WAL segments, the recovery will be consistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDecreaseWalSegmentsWithTruncate1() throws Exception {
+        checkRecoveryWithWalTruncate(6);
+    }
+
+    /**
+     * Checking that when switching WAL archive off -> on
+     * and truncation some WAL segments, the recovery will be consistent.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNotChangeWalSegmentsWithTruncate() throws Exception {
+        checkRecoveryWithWalTruncate(10);
+    }
+
+    /**
+     * Checking the consistency of recovery from a WAL when switching
+     * WAL archive off -> on and changing the number of segments on node restart.
+     * With truncate WAL segments.
+     *
+     * @param segments Segment count on node restart.
+     * @throws Exception If failed.
+     */
+    private void checkRecoveryWithWalTruncate(int segments) throws Exception {
+        IgniteEx n = startGrid(0, cfg -> {
+            cfg.getDataStorageConfiguration().setWalArchivePath(DFLT_WAL_PATH);
+        });
+
+        AtomicInteger key = new AtomicInteger();
+
+        dbMgr(n).checkpointReadLock();
+
+        try {
+            fill(n, 6, key);
+
+            // Protection against deleting WAL segments.
+            assertTrue(walMgr(n).reserve(new WALPointer(5, 0, 0)));
+        }
+        finally {
+            dbMgr(n).checkpointReadUnlock();
+        }
+
+        forceCheckpoint();
+        assertTrue(waitForCondition(() -> walMgr(n).lastTruncatedSegment() == 4, getTestTimeout()));
+
+        // Guaranteed recovery from WAL segments.
+        dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+        fill(n, 2, key);
+
+        stopAllGrids();
+
+        IgniteEx n0 = startGrid(0, cfg -> {
+            cfg.getDataStorageConfiguration().setWalSegments(segments);
+        });
+
+        assertEquals(key.get(), n0.cache(DEFAULT_CACHE_NAME).size());
+    }
+
+    /**
+     * Checking the consistency of recovery from a WAL when switching
+     * WAL archive off -> on and changing the number of segments on node restart.
+     * Without truncate WAL segments.
+     *
+     * @param segments Segment count on node restart.
+     * @throws Exception If failed.
+     */
+    private void checkRecoveryWithoutWalTruncate(int segments) throws Exception {
+        IgniteEx n = startGrid(0, cfg -> {
+            cfg.getDataStorageConfiguration().setWalArchivePath(DFLT_WAL_PATH);
+        });
+
+        // Protection against deleting WAL segments.
+        assertTrue(walMgr(n).reserve(new WALPointer(0, 0, 0)));
+
+        AtomicInteger key = new AtomicInteger();
+
+        fill(n, 3, key);
+        forceCheckpoint();
+
+        // Guaranteed recovery from WAL segments.
+        dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+        fill(n, 3, key);
+
+        stopAllGrids();
+
+        n = startGrid(0, cfg -> {
+            cfg.getDataStorageConfiguration().setWalSegments(segments);
+        });
+
+        assertEquals(key.get(), n.cache(DEFAULT_CACHE_NAME).size());
+    }
+
+    /**
+     * Filling the cache until N WAL segments are created.
+     *
+     * @param n Node.
+     * @param segments Number of segments.
+     * @param key Key counter.
+     */
+    private void fill(IgniteEx n, int segments, AtomicInteger key) {
+        long end = walMgr(n).currentSegment() + segments;
+        int i = 0;
+
+        while (walMgr(n).currentSegment() < end) {
+            int k = key.getAndIncrement();
+            int[] arr = new int[64];
+
+            Arrays.fill(arr, k);
+
+            n.cache(DEFAULT_CACHE_NAME).put(key, arr);
+
+            i++;
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Fill [keys=" + i + ", totalKeys=" + key.get() +
+                ", segNum=" + segments + ", currSeg=" + walMgr(n).currentSegment() + ']');
+        }
+    }
+
+    /**
+     * Getting WAL manager of node.
+     *
+     * @param n Node.
+     * @return WAL manager.
+     */
+    private FileWriteAheadLogManager walMgr(IgniteEx n) {
+        return (FileWriteAheadLogManager)n.context().cache().context().wal();
+    }
+
+    /**
+     * Getting db manager of node.
+     *
+     * @param n Node.
+     * @return Db manager.
+     */
+    private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
+        return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d286c126..a0a8830 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1038,6 +1038,22 @@ public abstract class GridAbstractTest extends JUnitAssertAware {
      * Starts new grid with given index.
      *
      * @param idx Index of the grid to start.
+     * @param cfgC Configuration mutator. Can be used to avoid oversimplification of {@link #getConfiguration()}.
+     * @return Started grid.
+     * @throws Exception If anything failed.
+     */
+    protected IgniteEx startGrid(int idx, Consumer<IgniteConfiguration> cfgC) throws Exception {
+        return startGrid(getTestIgniteInstanceName(idx), cfg -> {
+            cfgC.accept(cfg);
+
+            return cfg;
+        });
+    }
+
+    /**
+     * Starts new grid with given index.
+     *
+     * @param idx Index of the grid to start.
      * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
      * @return Started grid.
      * @throws Exception If anything failed.
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index ad60059..75aaed9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.CpTriggeredWa
 import org.apache.ignite.internal.processors.cache.persistence.wal.ExplicitWalDeltaConsistencyTest;
 import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest;
 import org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDeltaConsistencyTest;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WalArchiveConsistencyTest;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithNodeShutdownTest;
 import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest;
 import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationPersistentTest;
@@ -137,6 +138,8 @@ public class IgnitePdsTestSuite {
 
         GridTestUtils.addTestIfNeeded(suite, SegmentAwareTest.class, ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, WalArchiveConsistencyTest.class, ignoredTests);
+
         return suite;
     }